This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ba99dc4310 [Logical Table] Support logical tables in MSE. (#15773) ba99dc4310 is described below commit ba99dc4310615e4a299c278bf336a74d79dc71ce Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Fri May 30 19:24:31 2025 +0530 [Logical Table] Support logical tables in MSE. (#15773) --- .../api/resources/PinotQueryResource.java | 54 +++++++-- .../helix/core/PinotHelixResourceManager.java | 11 ++ .../core/data/manager/InstanceDataManager.java | 6 + .../core/data/manager/LogicalTableContext.java | 55 +++++++++ .../BaseLogicalTableIntegrationTest.java | 115 +++++++++++++++++-- .../apache/pinot/query/catalog/PinotCatalog.java | 11 +- .../planner/physical/DispatchablePlanContext.java | 7 ++ .../planner/physical/DispatchablePlanMetadata.java | 26 +++++ .../planner/physical/DispatchablePlanVisitor.java | 17 ++- .../apache/pinot/query/routing/WorkerManager.java | 96 +++++++++++++++- .../apache/pinot/query/routing/WorkerMetadata.java | 27 ++++- .../table/ImplicitHybridTableRouteProvider.java | 17 ++- .../query/routing/table/LogicalTableRouteInfo.java | 53 +++++---- .../routing/table/LogicalTableRouteProvider.java | 94 ++++++++++------ .../routing/table/PhysicalTableRouteProvider.java | 1 - .../timeboundary/MinTimeBoundaryStrategy.java | 47 ++++---- .../query/timeboundary/TimeBoundaryStrategy.java | 12 +- .../query/routing/table/BaseTableRouteTest.java | 2 +- ...HybridTableRouteProviderCalculateRouteTest.java | 17 ++- ...tHybridTableRouteProviderGetTableRouteTest.java | 65 +++-------- .../timeboundary/MinTimeBoundaryStrategyTest.java | 5 +- .../plan/server/ServerPlanRequestUtils.java | 124 ++++++++++++++++++--- .../starter/helix/HelixInstanceDataManager.java | 40 +++++++ 23 files changed, 719 insertions(+), 183 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java index 73d880570e..ceaaa8bf07 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java @@ -69,6 +69,7 @@ import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.parser.utils.ParserUtils; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.exception.DatabaseConflictException; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.exception.QueryErrorMessage; @@ -240,12 +241,30 @@ public class PinotQueryResource { List<String> instanceIds; if (!tableNames.isEmpty()) { List<TableConfig> tableConfigList = getListTableConfigs(tableNames, database); - if (tableConfigList == null || tableConfigList.isEmpty()) { - throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException("Unable to find table in cluster, table does not exist"); + List<LogicalTableConfig> logicalTableConfigList = null; + // First check for table configs, if not found, check for logical table configs. + if (tableConfigList.size() != tableNames.size()) { + logicalTableConfigList = getListLogicalTableConfigs(tableNames, database); + // If config is not found for all tables, then find the tables that are not found. + if ((tableConfigList.size() + logicalTableConfigList.size()) != tableNames.size()) { + Set<String> tableNamesFoundSet = new HashSet<>(); + for (TableConfig tableConfig : tableConfigList) { + tableNamesFoundSet.add(tableConfig.getTableName()); + } + for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) { + tableNamesFoundSet.add(logicalTableConfig.getTableName()); + } + + List<String> tablesNotFound = tableNames.stream().filter(name -> !tableNamesFoundSet.contains(name)) + .collect(Collectors.toList()); + + throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException( + "Unable to find table in cluster, table does not exist for tables: " + tablesNotFound); + } } // find the unions of all the broker tenant tags of the queried tables. - Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList); + Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList, logicalTableConfigList); if (brokerTenantsUnion.isEmpty()) { throw QueryErrorCode.BROKER_REQUEST_SEND.asException("Unable to find broker tenant for tables: " + tableNames); } @@ -324,14 +343,28 @@ public class PinotQueryResource { if (_pinotHelixResourceManager.hasOfflineTable(actualTableName)) { tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(actualTableName))); } - if (tableConfigList.isEmpty()) { - return null; + // If no table configs found for the table, skip it. + if (!tableConfigList.isEmpty()) { + allTableConfigList.addAll(tableConfigList); } - allTableConfigList.addAll(tableConfigList); } return allTableConfigList; } + private List<LogicalTableConfig> getListLogicalTableConfigs(List<String> tableNames, String database) { + List<LogicalTableConfig> allLogicalTableConfigList = new ArrayList<>(); + for (String tableName : tableNames) { + String actualTableName = _pinotHelixResourceManager.getActualLogicalTableName(tableName, database); + LogicalTableConfig logicalTableConfig = + _pinotHelixResourceManager.getLogicalTableConfig(actualTableName); + if (logicalTableConfig != null) { + allLogicalTableConfigList.add(logicalTableConfig); + } + } + return allLogicalTableConfigList; + } + + private String selectRandomInstanceId(List<String> instanceIds) { if (instanceIds.isEmpty()) { throw QueryErrorCode.BROKER_RESOURCE_MISSING.asException("No broker found for query"); @@ -356,11 +389,18 @@ public class PinotQueryResource { } // return the union of brokerTenants from the tables list. - private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList) { + private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList, + @Nullable List<LogicalTableConfig> logicalTableConfigList) { Set<String> tableBrokerTenants = new HashSet<>(); for (TableConfig tableConfig : tableConfigList) { tableBrokerTenants.add(tableConfig.getTenantConfig().getBroker()); } + + if (logicalTableConfigList != null) { + for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) { + tableBrokerTenants.add(logicalTableConfig.getBrokerTenant()); + } + } return tableBrokerTenants; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 0a4b2d37f3..ef823d55bf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -906,6 +906,17 @@ public class PinotHelixResourceManager { return actualTableName != null ? actualTableName : tableName; } + /** + * Given a logical table name in any case, returns the logical table name as defined in Helix/Segment/Schema + * @param logicalTableName logical tableName in any case. + * @return logicalTableName actually defined in Pinot (matches case) and exists ,else, return the input value + */ + public String getActualLogicalTableName(String logicalTableName, @Nullable String databaseName) { + logicalTableName = DatabaseUtils.translateTableName(logicalTableName, databaseName, _tableCache.isIgnoreCase()); + String actualTableName = _tableCache.getActualLogicalTableName(logicalTableName); + return actualTableName != null ? actualTableName : logicalTableName; + } + /** * Table related APIs */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index d5572dce52..15ef79f465 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -201,4 +201,10 @@ public interface InstanceDataManager { * Returns the instance data directory */ String getInstanceDataDir(); + + /** + * Returns the logical table config and schema for the given logical table name. + */ + @Nullable + LogicalTableContext getLogicalTableContext(String logicalTableName); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableContext.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableContext.java new file mode 100644 index 0000000000..86592f2db7 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableContext.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager; + +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.data.Schema; + + +public class LogicalTableContext { + private final LogicalTableConfig _logicalTableConfig; + private final Schema _logicalTableSchema; + private final TableConfig _refOfflineTableConfig; + private final TableConfig _refRealtimeTableConfig; + + public LogicalTableContext(LogicalTableConfig logicalTableConfig, Schema logicalTableSchema, + TableConfig refOfflineTableConfig, TableConfig refRealtimeTableConfig) { + _logicalTableConfig = logicalTableConfig; + _logicalTableSchema = logicalTableSchema; + _refOfflineTableConfig = refOfflineTableConfig; + _refRealtimeTableConfig = refRealtimeTableConfig; + } + + public LogicalTableConfig getLogicalTableConfig() { + return _logicalTableConfig; + } + + public Schema getLogicalTableSchema() { + return _logicalTableSchema; + } + + public TableConfig getRefOfflineTableConfig() { + return _refOfflineTableConfig; + } + + public TableConfig getRefRealtimeTableConfig() { + return _refRealtimeTableConfig; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java index 7fb4802cc1..0dcbb3a9f3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java @@ -34,6 +34,7 @@ import org.apache.pinot.controller.helix.ControllerRequestClient; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet; import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; +import org.apache.pinot.integration.tests.QueryAssert; import org.apache.pinot.integration.tests.QueryGenerator; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -48,6 +49,7 @@ import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; +import org.intellij.lang.annotations.Language; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -67,6 +69,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra private static final String DEFAULT_TENANT = "DefaultTenant"; private static final String DEFAULT_LOGICAL_TABLE_NAME = "mytable"; protected static final String DEFAULT_TABLE_NAME = "physicalTable"; + protected static final String EMPTY_OFFLINE_TABLE_NAME = "empty_o"; protected static BaseLogicalTableIntegrationTest _sharedClusterTestSuite = null; protected List<File> _avroFiles; @@ -111,6 +114,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra _controllerRequestURLBuilder = _sharedClusterTestSuite._controllerRequestURLBuilder; _helixResourceManager = _sharedClusterTestSuite._helixResourceManager; _kafkaStarters = _sharedClusterTestSuite._kafkaStarters; + _controllerBaseApiUrl = _sharedClusterTestSuite._controllerBaseApiUrl; } _avroFiles = getAllAvroFiles(); @@ -164,6 +168,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra // Wait for all documents loaded waitForAllDocsLoaded(600_000L); + createLogicalTableWithEmptyOfflineTable(); } @AfterClass @@ -250,6 +255,13 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra return DEFAULT_TENANT; } + // Setup H2 table with the same name as the logical table. + protected void setUpH2Connection(List<File> avroFiles) + throws Exception { + setUpH2Connection(); + ClusterIntegrationTestUtils.setUpH2TableWithAvro(avroFiles, getLogicalTableName(), _h2Connection); + } + /** * Creates a new OFFLINE table config. */ @@ -324,12 +336,35 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra return LogicalTableConfig.fromString(resp); } - protected void deleteLogicalTable() + private void createLogicalTableWithEmptyOfflineTable() throws IOException { - String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(getLogicalTableName()); - // delete logical table - String deleteResponse = ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders()); - assertEquals(deleteResponse, "{\"status\":\"" + getLogicalTableName() + " logical table successfully deleted.\"}"); + Schema schema = createSchema(getSchemaFileName()); + schema.setSchemaName(TableNameBuilder.extractRawTableName(EMPTY_OFFLINE_TABLE_NAME)); + addSchema(schema); + + Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>(); + TableConfig offlineTableConfig = createOfflineTableConfig(EMPTY_OFFLINE_TABLE_NAME); + addTableConfig(offlineTableConfig); + physicalTableConfigMap.put(TableNameBuilder.OFFLINE.tableNameWithType(EMPTY_OFFLINE_TABLE_NAME), + new PhysicalTableConfig()); + String refOfflineTableName = TableNameBuilder.OFFLINE.tableNameWithType(EMPTY_OFFLINE_TABLE_NAME); + + String logicalTableName = EMPTY_OFFLINE_TABLE_NAME + "_logical"; + + String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); + Schema logicalTableSchema = createSchema(getSchemaFileName()); + logicalTableSchema.setSchemaName(logicalTableName); + addSchema(logicalTableSchema); + LogicalTableConfigBuilder builder = + new LogicalTableConfigBuilder().setTableName(logicalTableName) + .setBrokerTenant(DEFAULT_TENANT) + .setRefOfflineTableName(refOfflineTableName) + .setPhysicalTableConfigMap(physicalTableConfigMap); + + String resp = + ControllerTest.sendPostRequest(addLogicalTableUrl, builder.build().toSingleLineJsonString(), getHeaders()); + assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + + " logical table successfully added.\"}"); } @Override @@ -414,22 +449,24 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra assertEquals(new HashSet<>(getPhysicalTableNames()), logicalTableConfig.getPhysicalTableConfigMap().keySet()); } - @Test - public void testHardcodedQueries() + @Test(dataProvider = "useBothQueryEngines") + public void testHardcodedQueries(boolean useMultiStageQueryEngine) throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); super.testHardcodedQueries(); } - @Test public void testQueriesFromQueryFile() throws Exception { + setUseMultiStageQueryEngine(false); super.testQueriesFromQueryFile(); } - @Test - public void testGeneratedQueries() + @Test(dataProvider = "useBothQueryEngines") + public void testGeneratedQueries(boolean useMultiStageQueryEngine) throws Exception { - super.testGeneratedQueries(true, false); + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + super.testGeneratedQueries(true, useMultiStageQueryEngine); } @Test @@ -558,4 +595,60 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); } + + @Test(dataProvider = "useBothQueryEngines") + public void testLogicalTableWithEmptyOfflineTable(boolean useMultiStageQueryEngine) + throws Exception { + + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + + String logicalTableName = EMPTY_OFFLINE_TABLE_NAME + "_logical"; + // Query should return empty result + JsonNode queryResponse = postQuery("SELECT count(*) FROM " + logicalTableName); + assertEquals(queryResponse.get("numDocsScanned").asInt(), 0); + assertEquals(queryResponse.get("numServersQueried").asInt(), useMultiStageQueryEngine ? 1 : 0); + assertTrue(queryResponse.get("exceptions").isEmpty()); + } + + @Test(dataProvider = "useBothQueryEngines") + void testControllerQuerySubmit(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + @Language("sql") + String query = "SELECT count(*) FROM " + getLogicalTableName(); + JsonNode response = postQueryToController(query); + assertNoError(response); + + query = "SELECT count(*) FROM " + getOfflineTableNames().get(0); + response = postQueryToController(query); + assertNoError(response); + + query = "SELECT count(*) FROM unknown"; + response = postQueryToController(query); + QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST) + .containsMessage("TableDoesNotExistError"); + } + + @Test + void testControllerJoinQuerySubmit() + throws Exception { + setUseMultiStageQueryEngine(true); + @Language("sql") + String query = "SELECT count(*) FROM " + getLogicalTableName() + " JOIN " + getPhysicalTableNames().get(0) + + " ON " + getLogicalTableName() + ".FlightNum = " + getPhysicalTableNames().get(0) + ".FlightNum"; + JsonNode response = postQueryToController(query); + assertNoError(response); + + query = "SELECT count(*) FROM unknown JOIN " + getPhysicalTableNames().get(0) + + " ON unknown.FlightNum = " + getPhysicalTableNames().get(0) + ".FlightNum"; + response = postQueryToController(query); + QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST) + .containsMessage("TableDoesNotExistError"); + + query = "SELECT count(*) FROM " + getLogicalTableName() + " JOIN known ON " + + getLogicalTableName() + ".FlightNum = unknown.FlightNum"; + response = postQueryToController(query); + QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST) + .containsMessage("TableDoesNotExistError"); + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java index da40d6c48c..c4ba328cee 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java @@ -21,6 +21,7 @@ package org.apache.pinot.query.catalog; import java.util.Collection; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nullable; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.rel.type.RelProtoDataType; @@ -68,10 +69,14 @@ public class PinotCatalog implements Schema { String rawTableName = TableNameBuilder.extractRawTableName(name); String physicalTableName = DatabaseUtils.translateTableName(rawTableName, _databaseName); String tableName = _tableCache.getActualTableName(physicalTableName); + if (tableName == null) { - return null; + tableName = _tableCache.getActualLogicalTableName(physicalTableName); } + if (tableName == null) { + return null; + } org.apache.pinot.spi.data.Schema schema = _tableCache.getSchema(tableName); if (schema == null) { return null; @@ -86,7 +91,9 @@ public class PinotCatalog implements Schema { */ @Override public Set<String> getTableNames() { - return _tableCache.getTableNameMap().keySet().stream().filter(n -> DatabaseUtils.isPartOfDatabase(n, _databaseName)) + return Stream.concat(_tableCache.getTableNameMap().keySet().stream(), + _tableCache.getLogicalTableNameMap().keySet().stream()) + .filter(n -> DatabaseUtils.isPartOfDatabase(n, _databaseName)) .collect(Collectors.toSet()); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java index 5aaf277858..fbace37191 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java @@ -103,8 +103,12 @@ public class DispatchablePlanContext { dispatchablePlanMetadata.getWorkerIdToServerInstanceMap(); Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = dispatchablePlanMetadata.getWorkerIdToSegmentsMap(); + Map<Integer, Map<String, List<String>>> workerIdToTableNameSegmentsMap = + dispatchablePlanMetadata.getWorkerIdToTableSegmentsMap(); Map<Integer, Map<Integer, MailboxInfos>> workerIdToMailboxesMap = dispatchablePlanMetadata.getWorkerIdToMailboxesMap(); + Preconditions.checkArgument(workerIdToSegmentsMap == null || workerIdToTableNameSegmentsMap == null, + "Both workerIdToSegmentsMap and workerIdToTableNameSegmentsMap cannot be set at the same time"); Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdsMap = new HashMap<>(); WorkerMetadata[] workerMetadataArray = new WorkerMetadata[workerIdToServerInstanceMap.size()]; for (Map.Entry<Integer, QueryServerInstance> serverEntry : workerIdToServerInstanceMap.entrySet()) { @@ -115,6 +119,9 @@ public class DispatchablePlanContext { if (workerIdToSegmentsMap != null) { workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId)); } + if (workerIdToTableNameSegmentsMap != null) { + workerMetadata.setLogicalTableSegmentsMap(workerIdToTableNameSegmentsMap.get(workerId)); + } workerMetadataArray[workerId] = workerMetadata; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java index 0c27906ee6..ec88a8e4f4 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.query.routing.MailboxInfos; import org.apache.pinot.query.routing.QueryServerInstance; +import org.apache.pinot.query.routing.table.LogicalTableRouteInfo; /** @@ -80,6 +81,12 @@ public class DispatchablePlanMetadata implements Serializable { // Map from workerId -> {planFragmentId -> mailboxes} private final Map<Integer, Map<Integer, MailboxInfos>> _workerIdToMailboxesMap = new HashMap<>(); + /** + * Map from workerId -> {physicalTableName -> segments} is required for logical tables. + */ + private Map<Integer, Map<String, List<String>>> _workerIdToTableSegmentsMap; + private LogicalTableRouteInfo _logicalTableRouteInfo; + public List<String> getScannedTables() { return _scannedTables; } @@ -178,4 +185,23 @@ public class DispatchablePlanMetadata implements Serializable { public void addUnavailableSegments(String tableName, Collection<String> unavailableSegments) { _tableToUnavailableSegmentsMap.computeIfAbsent(tableName, k -> new HashSet<>()).addAll(unavailableSegments); } + + @Nullable + public LogicalTableRouteInfo getLogicalTableRouteInfo() { + return _logicalTableRouteInfo; + } + + public void setLogicalTableRouteInfo(LogicalTableRouteInfo logicalTableRouteInfo) { + _logicalTableRouteInfo = logicalTableRouteInfo; + } + + @Nullable + public Map<Integer, Map<String, List<String>>> getWorkerIdToTableSegmentsMap() { + return _workerIdToTableSegmentsMap; + } + + public void setWorkerIdToTableSegmentsMap( + Map<Integer, Map<String, List<String>>> workerIdToTableSegmentsMap) { + _workerIdToTableSegmentsMap = workerIdToTableSegmentsMap; + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java index c79a6bec68..848bea8fbc 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.planner.physical; +import com.google.common.base.Preconditions; import java.util.Collections; import java.util.IdentityHashMap; import java.util.Set; @@ -38,6 +39,8 @@ import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; import org.apache.pinot.query.planner.plannode.ValueNode; import org.apache.pinot.query.planner.plannode.WindowNode; +import org.apache.pinot.query.routing.table.LogicalTableRouteInfo; +import org.apache.pinot.query.routing.table.LogicalTableRouteProvider; public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, DispatchablePlanContext> { @@ -137,7 +140,19 @@ public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, Dispatchab @Override public Void visitTableScan(TableScanNode node, DispatchablePlanContext context) { DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context); - dispatchablePlanMetadata.addScannedTable(_tableCache.getActualTableName(node.getTableName())); + + String tableNameInNode = node.getTableName(); + String tableName = _tableCache.getActualTableName(tableNameInNode); + if (tableName == null) { + tableName = _tableCache.getActualLogicalTableName(tableNameInNode); + Preconditions.checkNotNull(tableName, "Logical table config not found in table cache: " + tableNameInNode); + LogicalTableRouteProvider tableRouteProvider = new LogicalTableRouteProvider(); + LogicalTableRouteInfo logicalTableRouteInfo = new LogicalTableRouteInfo(); + tableRouteProvider.fillTableConfigMetadata(logicalTableRouteInfo, tableName, _tableCache); + dispatchablePlanMetadata.setLogicalTableRouteInfo(logicalTableRouteInfo); + } + + dispatchablePlanMetadata.addScannedTable(tableName); dispatchablePlanMetadata.setTableOptions( node.getNodeHint().getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS)); return null; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index b474b06b4e..4a28f11632 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -35,18 +35,22 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.rules.ImmutableTableOptions; import org.apache.pinot.calcite.rel.rules.TableOptions; +import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.TableRouteInfo; import org.apache.pinot.query.planner.PlanFragment; import org.apache.pinot.query.planner.physical.DispatchablePlanContext; import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata; import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.apache.pinot.query.routing.table.LogicalTableRouteInfo; +import org.apache.pinot.query.routing.table.LogicalTableRouteProvider; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -373,7 +377,11 @@ public class WorkerManager { DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId()); Map<String, String> tableOptions = metadata.getTableOptions(); if (tableOptions == null) { - assignWorkersToNonPartitionedLeafFragment(metadata, context); + if (metadata.getLogicalTableRouteInfo() != null) { + assignWorkersToNonPartitionedLeafFragmentForLogicalTable(metadata, context); + } else { + assignWorkersToNonPartitionedLeafFragment(metadata, context); + } return; } @@ -392,7 +400,11 @@ public class WorkerManager { if (Boolean.parseBoolean(tableOptions.get(PinotHintOptions.TableHintOptions.IS_REPLICATED))) { setSegmentsForReplicatedLeafFragment(metadata); } else { - assignWorkersToNonPartitionedLeafFragment(metadata, context); + if (metadata.getLogicalTableRouteInfo() != null) { + assignWorkersToNonPartitionedLeafFragmentForLogicalTable(metadata, context); + } else { + assignWorkersToNonPartitionedLeafFragment(metadata, context); + } } } @@ -539,6 +551,86 @@ public class WorkerManager { CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" + tableNameWithType + "\"")); } + private void assignWorkersToNonPartitionedLeafFragmentForLogicalTable(DispatchablePlanMetadata metadata, + DispatchablePlanContext context) { + LogicalTableRouteInfo logicalTableRouteInfo = metadata.getLogicalTableRouteInfo(); + Preconditions.checkNotNull(logicalTableRouteInfo); + LogicalTableRouteProvider tableRouteProvider = new LogicalTableRouteProvider(); + tableRouteProvider.fillRouteMetadata(logicalTableRouteInfo, _routingManager); + if (logicalTableRouteInfo.getTimeBoundaryInfo() != null) { + metadata.setTimeBoundaryInfo(logicalTableRouteInfo.getTimeBoundaryInfo()); + } + BrokerRequest offlineBrokerRequest = null; + BrokerRequest realtimeBrokerRequest = null; + + if (logicalTableRouteInfo.hasOffline()) { + offlineBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest( + "SELECT * FROM \"" + logicalTableRouteInfo.getOfflineTableName() + "\""); + } + + if (logicalTableRouteInfo.hasRealtime()) { + realtimeBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest( + "SELECT * FROM \"" + logicalTableRouteInfo.getRealtimeTableName() + "\""); + } + + tableRouteProvider.calculateRoutes(logicalTableRouteInfo, _routingManager, offlineBrokerRequest, + realtimeBrokerRequest, context.getRequestId()); + + assignTableSegmentsToWorkers(logicalTableRouteInfo, metadata); + } + + private static void assignTableSegmentsToWorkers(LogicalTableRouteInfo logicalTableRouteInfo, + DispatchablePlanMetadata metadata) { + Map<ServerInstance, Map<String, List<String>>> serverInstanceToLogicalSegmentsMap = + new HashMap<>(); + + if (logicalTableRouteInfo.getOfflineTables() != null) { + for (TableRouteInfo physicalTableRoute : logicalTableRouteInfo.getOfflineTables()) { + // Routing table maybe null if no routing table is found OR there are no segments. + if (physicalTableRoute.getOfflineRoutingTable() != null) { + transferToServerInstanceLogicalSegmentsMap(physicalTableRoute.getOfflineTableName(), + physicalTableRoute.getOfflineRoutingTable(), serverInstanceToLogicalSegmentsMap); + } + } + } + + if (logicalTableRouteInfo.getRealtimeTables() != null) { + for (TableRouteInfo physicalTableRoute : logicalTableRouteInfo.getRealtimeTables()) { + // Routing table maybe null if no routing table is found OR there are no segments. + if (physicalTableRoute.getRealtimeRoutingTable() != null) { + transferToServerInstanceLogicalSegmentsMap(physicalTableRoute.getRealtimeTableName(), + physicalTableRoute.getRealtimeRoutingTable(), serverInstanceToLogicalSegmentsMap); + } + } + } + + int workerId = 0; + Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new HashMap<>(); + Map<Integer, Map<String, List<String>>> workerIdToLogicalTableSegmentsMap = new HashMap<>(); + for (Map.Entry<ServerInstance, Map<String, List<String>>> entry + : serverInstanceToLogicalSegmentsMap.entrySet()) { + workerIdToServerInstanceMap.put(workerId, new QueryServerInstance(entry.getKey())); + workerIdToLogicalTableSegmentsMap.put(workerId, entry.getValue()); + workerId++; + } + + metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap); + metadata.setWorkerIdToTableSegmentsMap(workerIdToLogicalTableSegmentsMap); + } + + private static void transferToServerInstanceLogicalSegmentsMap(String physicalTableName, + Map<ServerInstance, ServerRouteInfo> segmentsMap, + Map<ServerInstance, Map<String, List<String>>> serverInstanceToLogicalSegmentsMap) { + for (Map.Entry<ServerInstance, ServerRouteInfo> serverEntry : segmentsMap.entrySet()) { + Map<String, List<String>> tableNameToSegmentsMap = + serverInstanceToLogicalSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>()); + // TODO: support optional segments for multi-stage engine. + Preconditions.checkState( + tableNameToSegmentsMap.put(physicalTableName, serverEntry.getValue().getSegments()) == null, + "Entry for server {} and physical table: {} already exist!", serverEntry.getKey(), physicalTableName); + } + } + // -------------------------------------------------------------------------- // Partitioned leaf stage assignment // -------------------------------------------------------------------------- diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java index ab1226862e..b00a31294e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java @@ -43,6 +43,7 @@ import org.apache.pinot.spi.utils.JsonUtils; */ public class WorkerMetadata { public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap"; + public static final String LOGICAL_TABLE_SEGMENTS_MAP_KEY = "logicalTableSegmentsMap"; private final int _workerId; private final Map<Integer, MailboxInfos> _mailboxInfosMap; @@ -75,13 +76,17 @@ public class WorkerMetadata { @Nullable public Map<String, List<String>> getTableSegmentsMap() { - String tableSegmentsMapStr = _customProperties.get(TABLE_SEGMENTS_MAP_KEY); + return deserializeStringSegmentListMap(TABLE_SEGMENTS_MAP_KEY); + } + + private Map<String, List<String>> deserializeStringSegmentListMap(String propertyKey) { + String tableSegmentsMapStr = _customProperties.get(propertyKey); if (tableSegmentsMapStr != null) { try { return JsonUtils.stringToObject(tableSegmentsMapStr, new TypeReference<Map<String, List<String>>>() { }); } catch (IOException e) { - throw new RuntimeException("Unable to deserialize table segments map: " + tableSegmentsMapStr, e); + throw new RuntimeException("Unable to deserialize " + propertyKey + " : " + tableSegmentsMapStr, e); } } else { return null; @@ -89,7 +94,8 @@ public class WorkerMetadata { } public boolean isLeafStageWorker() { - return _customProperties.containsKey(TABLE_SEGMENTS_MAP_KEY); + return _customProperties.containsKey(TABLE_SEGMENTS_MAP_KEY) + || _customProperties.containsKey(LOGICAL_TABLE_SEGMENTS_MAP_KEY); } public void setTableSegmentsMap(Map<String, List<String>> tableSegmentsMap) { @@ -101,4 +107,19 @@ public class WorkerMetadata { } _customProperties.put(TABLE_SEGMENTS_MAP_KEY, tableSegmentsMapStr); } + + @Nullable + public Map<String, List<String>> getLogicalTableSegmentsMap() { + return deserializeStringSegmentListMap(LOGICAL_TABLE_SEGMENTS_MAP_KEY); + } + + public void setLogicalTableSegmentsMap(Map<String, List<String>> logicalTableSegmentsMap) { + String logicalTableSegmentsMapStr; + try { + logicalTableSegmentsMapStr = JsonUtils.objectToString(logicalTableSegmentsMap); + } catch (JsonProcessingException e) { + throw new RuntimeException("Unable to serialize table segments map: " + logicalTableSegmentsMap, e); + } + _customProperties.put(LOGICAL_TABLE_SEGMENTS_MAP_KEY, logicalTableSegmentsMapStr); + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java index 63543c41ce..48553c328d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java @@ -46,10 +46,18 @@ public class ImplicitHybridTableRouteProvider implements TableRouteProvider { private static final Logger LOGGER = LoggerFactory.getLogger(ImplicitHybridTableRouteProvider.class); @Override - public ImplicitHybridTableRouteInfo getTableRouteInfo(String tableName, TableCache tableCache, + public TableRouteInfo getTableRouteInfo(String tableName, TableCache tableCache, RoutingManager routingManager) { ImplicitHybridTableRouteInfo tableRouteInfo = new ImplicitHybridTableRouteInfo(); + fillTableConfigMetadata(tableRouteInfo, tableName, tableCache); + fillRouteMetadata(tableRouteInfo, routingManager); + + return tableRouteInfo; + } + + public void fillTableConfigMetadata(ImplicitHybridTableRouteInfo tableRouteInfo, + String tableName, TableCache tableCache) { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); if (tableType == TableType.OFFLINE) { @@ -71,19 +79,24 @@ public class ImplicitHybridTableRouteProvider implements TableRouteProvider { if (realtimeTableName != null) { tableRouteInfo.setRealtimeTableConfig(tableCache.getTableConfig(realtimeTableName)); } + } + public void fillRouteMetadata(ImplicitHybridTableRouteInfo tableRouteInfo, RoutingManager routingManager) { if (tableRouteInfo.hasOffline()) { + String offlineTableName = tableRouteInfo.getOfflineTableName(); tableRouteInfo.setOfflineRouteExists(routingManager.routingExists(offlineTableName)); tableRouteInfo.setOfflineTableDisabled(routingManager.isTableDisabled(offlineTableName)); } if (tableRouteInfo.hasRealtime()) { + String realtimeTableName = tableRouteInfo.getRealtimeTableName(); tableRouteInfo.setRealtimeRouteExists(routingManager.routingExists(realtimeTableName)); tableRouteInfo.setRealtimeTableDisabled(routingManager.isTableDisabled(realtimeTableName)); } // Get TimeBoundaryInfo. If there is no time boundary, then do not consider the offline table. if (tableRouteInfo.isHybrid()) { + String offlineTableName = tableRouteInfo.getOfflineTableName(); // Time boundary info might be null when there is no segment in the offline table, query real-time side only TimeBoundaryInfo timeBoundaryInfo = routingManager.getTimeBoundaryInfo(offlineTableName); if (timeBoundaryInfo == null) { @@ -95,8 +108,6 @@ public class ImplicitHybridTableRouteProvider implements TableRouteProvider { tableRouteInfo.setTimeBoundaryInfo(timeBoundaryInfo); } } - - return tableRouteInfo; } @Override diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java index 04d3b76bed..612f8e79cd 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java @@ -32,22 +32,23 @@ import org.apache.pinot.common.request.TableSegmentsInfo; import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.BaseTableRouteInfo; +import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.core.transport.TableRouteInfo; +import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; public class LogicalTableRouteInfo extends BaseTableRouteInfo { - private final LogicalTableConfig _logicalTable; - private List<TableRouteInfo> _offlineTables; - private List<TableRouteInfo> _realtimeTables; + private String _logicalTableName; + private List<ImplicitHybridTableRouteInfo> _offlineTables; + private List<ImplicitHybridTableRouteInfo> _realtimeTables; private TableConfig _offlineTableConfig; private TableConfig _realtimeTableConfig; private QueryConfig _queryConfig; @@ -56,15 +57,9 @@ public class LogicalTableRouteInfo extends BaseTableRouteInfo { private BrokerRequest _offlineBrokerRequest; private BrokerRequest _realtimeBrokerRequest; - private TimeBoundaryInfo _timeBoundaryInfo; - - LogicalTableRouteInfo() { - _logicalTable = null; - } - public LogicalTableRouteInfo(LogicalTableConfig logicalTable) { - _logicalTable = logicalTable; - } + private TimeBoundaryStrategy _timeBoundaryStrategy; + private TimeBoundaryInfo _timeBoundaryInfo; @Override public Map<ServerRoutingInstance, InstanceRequest> getRequestMap(long requestId, String brokerId, boolean preferTls) { @@ -139,6 +134,15 @@ public class LogicalTableRouteInfo extends BaseTableRouteInfo { return instanceRequest; } + public void setLogicalTableName(String logicalTableName) { + _logicalTableName = logicalTableName; + } + + @Nullable + public String getLogicalTableName() { + return _logicalTableName; + } + @Nullable @Override public TableConfig getOfflineTableConfig() { @@ -234,15 +238,15 @@ public class LogicalTableRouteInfo extends BaseTableRouteInfo { @Nullable @Override public String getOfflineTableName() { - return hasOffline() && _logicalTable != null ? TableNameBuilder.OFFLINE.tableNameWithType( - _logicalTable.getTableName()) : null; + return hasOffline() && _logicalTableName != null ? TableNameBuilder.OFFLINE.tableNameWithType(_logicalTableName) + : null; } @Nullable @Override public String getRealtimeTableName() { - return hasRealtime() && _logicalTable != null ? TableNameBuilder.REALTIME.tableNameWithType( - _logicalTable.getTableName()) : null; + return hasRealtime() && _logicalTableName != null ? TableNameBuilder.REALTIME.tableNameWithType(_logicalTableName) + : null; } @Nullable @@ -356,20 +360,20 @@ public class LogicalTableRouteInfo extends BaseTableRouteInfo { } @Nullable - public List<TableRouteInfo> getOfflineTables() { + public List<ImplicitHybridTableRouteInfo> getOfflineTables() { return _offlineTables; } - public void setOfflineTables(List<TableRouteInfo> offlineTables) { + public void setOfflineTables(List<ImplicitHybridTableRouteInfo> offlineTables) { _offlineTables = offlineTables; } @Nullable - public List<TableRouteInfo> getRealtimeTables() { + public List<ImplicitHybridTableRouteInfo> getRealtimeTables() { return _realtimeTables; } - public void setRealtimeTables(List<TableRouteInfo> realtimeTables) { + public void setRealtimeTables(List<ImplicitHybridTableRouteInfo> realtimeTables) { _realtimeTables = realtimeTables; } @@ -388,4 +392,13 @@ public class LogicalTableRouteInfo extends BaseTableRouteInfo { public void setRealtimeBrokerRequest(BrokerRequest realtimeBrokerRequest) { _realtimeBrokerRequest = realtimeBrokerRequest; } + + @Nullable + public TimeBoundaryStrategy getTimeBoundaryStrategy() { + return _timeBoundaryStrategy; + } + + public void setTimeBoundaryStrategy(TimeBoundaryStrategy timeBoundaryStrategy) { + _timeBoundaryStrategy = timeBoundaryStrategy; + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java index fe3937b8b0..bdce62a938 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java @@ -25,6 +25,7 @@ import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.TimeBoundaryInfo; +import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo; import org.apache.pinot.core.transport.TableRouteInfo; import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy; import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService; @@ -41,20 +42,29 @@ public class LogicalTableRouteProvider implements TableRouteProvider { @Override public TableRouteInfo getTableRouteInfo(String tableName, TableCache tableCache, RoutingManager routingManager) { - LogicalTableConfig logicalTable = tableCache.getLogicalTableConfig(tableName); - if (logicalTable == null) { - return new LogicalTableRouteInfo(); - } + LogicalTableRouteInfo logicalTableRouteInfo = new LogicalTableRouteInfo(); + fillTableConfigMetadata(logicalTableRouteInfo, tableName, tableCache); + fillRouteMetadata(logicalTableRouteInfo, routingManager); + return logicalTableRouteInfo; + } + public void fillTableConfigMetadata(LogicalTableRouteInfo logicalTableRouteInfo, String tableName, + TableCache tableCache) { + LogicalTableConfig logicalTableConfig = tableCache.getLogicalTableConfig(tableName); + if (logicalTableConfig == null) { + return; + } + logicalTableRouteInfo.setLogicalTableName(tableName); PhysicalTableRouteProvider routeProvider = new PhysicalTableRouteProvider(); - List<TableRouteInfo> offlineTables = new ArrayList<>(); - List<TableRouteInfo> realtimeTables = new ArrayList<>(); - for (String physicalTableName : logicalTable.getPhysicalTableConfigMap().keySet()) { + List<ImplicitHybridTableRouteInfo> offlineTables = new ArrayList<>(); + List<ImplicitHybridTableRouteInfo> realtimeTables = new ArrayList<>(); + for (String physicalTableName : logicalTableConfig.getPhysicalTableConfigMap().keySet()) { TableType tableType = TableNameBuilder.getTableTypeFromTableName(physicalTableName); Preconditions.checkNotNull(tableType); - TableRouteInfo physicalTableInfo = - routeProvider.getTableRouteInfo(physicalTableName, tableCache, routingManager); + ImplicitHybridTableRouteInfo physicalTableInfo = new ImplicitHybridTableRouteInfo(); + routeProvider.fillTableConfigMetadata(physicalTableInfo, physicalTableName, tableCache); + if (physicalTableInfo.isExists()) { if (tableType == TableType.OFFLINE) { offlineTables.add(physicalTableInfo); @@ -64,37 +74,59 @@ public class LogicalTableRouteProvider implements TableRouteProvider { } } - LogicalTableRouteInfo routeInfo = new LogicalTableRouteInfo(logicalTable); if (!offlineTables.isEmpty()) { - TableConfig offlineTableConfig = tableCache.getTableConfig(logicalTable.getRefOfflineTableName()); + TableConfig offlineTableConfig = tableCache.getTableConfig(logicalTableConfig.getRefOfflineTableName()); Preconditions.checkNotNull(offlineTableConfig, - "Offline table config not found: " + logicalTable.getRefOfflineTableName()); - routeInfo.setOfflineTables(offlineTables); - routeInfo.setOfflineTableConfig(offlineTableConfig); + "Offline table config not found: " + logicalTableConfig.getRefOfflineTableName()); + logicalTableRouteInfo.setOfflineTables(offlineTables); + logicalTableRouteInfo.setOfflineTableConfig(offlineTableConfig); } + if (!realtimeTables.isEmpty()) { - TableConfig realtimeTableConfig = tableCache.getTableConfig(logicalTable.getRefRealtimeTableName()); + TableConfig realtimeTableConfig = tableCache.getTableConfig(logicalTableConfig.getRefRealtimeTableName()); Preconditions.checkNotNull(realtimeTableConfig, - "Realtime table config not found: " + logicalTable.getRefRealtimeTableName()); - routeInfo.setRealtimeTables(realtimeTables); - routeInfo.setRealtimeTableConfig(realtimeTableConfig); + "Realtime table config not found: " + logicalTableConfig.getRefRealtimeTableName()); + logicalTableRouteInfo.setRealtimeTables(realtimeTables); + logicalTableRouteInfo.setRealtimeTableConfig(realtimeTableConfig); } - routeInfo.setQueryConfig(logicalTable.getQueryConfig()); - TimeBoundaryInfo timeBoundaryInfo; if (!offlineTables.isEmpty() && !realtimeTables.isEmpty()) { - String boundaryStrategy = logicalTable.getTimeBoundaryConfig().getBoundaryStrategy(); + String boundaryStrategy = logicalTableConfig.getTimeBoundaryConfig().getBoundaryStrategy(); TimeBoundaryStrategy timeBoundaryStrategy = TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(boundaryStrategy); - timeBoundaryInfo = timeBoundaryStrategy.computeTimeBoundary(logicalTable, tableCache, routingManager); - if (timeBoundaryInfo == null) { - LOGGER.info("No time boundary info found for logical hybrid table: "); - routeInfo.setOfflineTables(null); - } else { - routeInfo.setTimeBoundaryInfo(timeBoundaryInfo); + timeBoundaryStrategy.init(logicalTableConfig, tableCache); + logicalTableRouteInfo.setTimeBoundaryStrategy(timeBoundaryStrategy); + } + + logicalTableRouteInfo.setQueryConfig(logicalTableConfig.getQueryConfig()); + } + + public void fillRouteMetadata(LogicalTableRouteInfo logicalTableRouteInfo, RoutingManager routingManager) { + ImplicitHybridTableRouteProvider tableRouteProvider = new ImplicitHybridTableRouteProvider(); + if (logicalTableRouteInfo.getOfflineTables() != null) { + for (ImplicitHybridTableRouteInfo routeInfo : logicalTableRouteInfo.getOfflineTables()) { + tableRouteProvider.fillRouteMetadata(routeInfo, routingManager); + } + } + + if (logicalTableRouteInfo.getRealtimeTables() != null) { + for (ImplicitHybridTableRouteInfo routeInfo : logicalTableRouteInfo.getRealtimeTables()) { + tableRouteProvider.fillRouteMetadata(routeInfo, routingManager); + } + } + + if (logicalTableRouteInfo.isHybrid()) { + TimeBoundaryStrategy timeBoundaryStrategy = logicalTableRouteInfo.getTimeBoundaryStrategy(); + if (timeBoundaryStrategy != null) { + TimeBoundaryInfo timeBoundaryInfo = timeBoundaryStrategy.computeTimeBoundary(routingManager); + if (timeBoundaryInfo == null) { + LOGGER.info("No time boundary info found for logical hybrid table: "); + logicalTableRouteInfo.setOfflineTables(null); + } else { + logicalTableRouteInfo.setTimeBoundaryInfo(timeBoundaryInfo); + } } } - return routeInfo; } @Override @@ -107,8 +139,7 @@ public class LogicalTableRouteProvider implements TableRouteProvider { if (routeInfo.getOfflineTables() != null) { for (TableRouteInfo physicalTableInfo : routeInfo.getOfflineTables()) { - routeProvider.calculateRoutes(physicalTableInfo, routingManager, offlineBrokerRequest, null, - requestId); + routeProvider.calculateRoutes(physicalTableInfo, routingManager, offlineBrokerRequest, null, requestId); numPrunedSegments += physicalTableInfo.getNumPrunedSegmentsTotal(); if (physicalTableInfo.getUnavailableSegments() != null) { unavailableSegments.addAll(physicalTableInfo.getUnavailableSegments()); @@ -118,8 +149,7 @@ public class LogicalTableRouteProvider implements TableRouteProvider { if (routeInfo.getRealtimeTables() != null) { for (TableRouteInfo physicalTableInfo : routeInfo.getRealtimeTables()) { - routeProvider.calculateRoutes(physicalTableInfo, routingManager, null, realtimeBrokerRequest, - requestId); + routeProvider.calculateRoutes(physicalTableInfo, routingManager, null, realtimeBrokerRequest, requestId); numPrunedSegments += physicalTableInfo.getNumPrunedSegmentsTotal(); if (physicalTableInfo.getUnavailableSegments() != null) { unavailableSegments.addAll(physicalTableInfo.getUnavailableSegments()); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java index 287afcdd7b..04d09fd634 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java @@ -45,7 +45,6 @@ public class PhysicalTableRouteProvider extends ImplicitHybridTableRouteProvider @Override public void calculateRoutes(TableRouteInfo tableRouteInfo, RoutingManager routingManager, BrokerRequest offlineBrokerRequest, BrokerRequest realtimeBrokerRequest, long requestId) { - assert (tableRouteInfo.isExists()); String offlineTableName = tableRouteInfo.getOfflineTableName(); String realtimeTableName = tableRouteInfo.getRealtimeTableName(); Map<ServerInstance, ServerRouteInfo> offlineRoutingTable = null; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java index 04be7d3cdf..e204c917da 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.timeboundary; import com.google.auto.service.AutoService; import com.google.common.base.Preconditions; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.common.config.provider.TableCache; @@ -36,7 +37,27 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; @AutoService(TimeBoundaryStrategy.class) public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy { - public static final String INCLUDED_TABLES = "includedTables"; + private static final String INCLUDED_TABLES = "includedTables"; + Map<String, DateTimeFormatSpec> _dateTimeFormatSpecMap; + + @Override + public void init(LogicalTableConfig logicalTableConfig, TableCache tableCache) { + List<String> includedTables = getTimeBoundaryTableNames(logicalTableConfig); + _dateTimeFormatSpecMap = new HashMap<>(includedTables.size()); + for (String physicalTableName : includedTables) { + String rawTableName = TableNameBuilder.extractRawTableName(physicalTableName); + Schema schema = tableCache.getSchema(rawTableName); + TableConfig tableConfig = tableCache.getTableConfig(physicalTableName); + Preconditions.checkArgument(tableConfig != null, "Table config not found for table: %s", physicalTableName); + Preconditions.checkArgument(schema != null, "Schema not found for table: %s", physicalTableName); + String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); + DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumnName); + Preconditions.checkArgument(dateTimeFieldSpec != null, "Time column not found in schema for table: %s", + physicalTableName); + DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec(); + _dateTimeFormatSpecMap.put(physicalTableName, specFormatSpec); + } + } @Override public String getName() { @@ -44,29 +65,13 @@ public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy { } @Override - public TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig, TableCache tableCache, - RoutingManager routingManager) { + public TimeBoundaryInfo computeTimeBoundary(RoutingManager routingManager) { TimeBoundaryInfo minTimeBoundaryInfo = null; long minTimeBoundary = Long.MAX_VALUE; - Map<String, Object> parameters = logicalTableConfig.getTimeBoundaryConfig().getParameters(); - List<String> includedTables = - parameters != null ? (List) parameters.getOrDefault("includedTables", List.of()) : List.of(); - for (String physicalTableName : includedTables) { - TimeBoundaryInfo current = routingManager.getTimeBoundaryInfo(physicalTableName); + for (Map.Entry<String, DateTimeFormatSpec> entry : _dateTimeFormatSpecMap.entrySet()) { + TimeBoundaryInfo current = routingManager.getTimeBoundaryInfo(entry.getKey()); if (current != null) { - String rawTableName = TableNameBuilder.extractRawTableName(physicalTableName); - Schema schema = tableCache.getSchema(rawTableName); - TableConfig tableConfig = tableCache.getTableConfig(physicalTableName); - Preconditions.checkArgument(tableConfig != null, - "Table config not found for table: %s", physicalTableName); - Preconditions.checkArgument(schema != null, - "Schema not found for table: %s", physicalTableName); - String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); - DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumnName); - Preconditions.checkArgument(dateTimeFieldSpec != null, - "Time column not found in schema for table: %s", physicalTableName); - DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec(); - long currentTimeBoundaryMillis = specFormatSpec.fromFormatToMillis(current.getTimeValue()); + long currentTimeBoundaryMillis = entry.getValue().fromFormatToMillis(current.getTimeValue()); if (minTimeBoundaryInfo == null) { minTimeBoundaryInfo = current; minTimeBoundary = currentTimeBoundaryMillis; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java index 7a4ee21794..f2215b29b2 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java @@ -34,16 +34,20 @@ public interface TimeBoundaryStrategy { */ String getName(); + /** + * Initializes the time boundary strategy with the given logical table configuration and table cache. + * @param logicalTableConfig The logical table configuration to use for initialization. + * @param tableCache The table cache to use for initialization. + */ + void init(LogicalTableConfig logicalTableConfig, TableCache tableCache); + /** * Computes the time boundary for the given physical table names. * - * @param logicalTableConfig The logical table configuration. - * @param tableCache The table cache to use for fetching table metadata. * @param routingManager The routing manager to use for computing the time boundary. * @return The computed time boundary information. */ - TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig, TableCache tableCache, - RoutingManager routingManager); + TimeBoundaryInfo computeTimeBoundary(RoutingManager routingManager); /** diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java index 1c08dca4ae..71901f8432 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java @@ -170,7 +170,7 @@ public class BaseTableRouteTest { TimeBoundaryStrategyService mockService = mock(TimeBoundaryStrategyService.class); when(TimeBoundaryStrategyService.getInstance()).thenReturn(mockService); when(mockService.getTimeBoundaryStrategy(any())).thenReturn(_timeBoundaryStrategy); - when(_timeBoundaryStrategy.computeTimeBoundary(any(), any(), any())).thenReturn(mock(TimeBoundaryInfo.class)); + when(_timeBoundaryStrategy.computeTimeBoundary(any())).thenReturn(mock(TimeBoundaryInfo.class)); } @AfterClass diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java index 6beb5d3ec6..be8606d068 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java @@ -27,9 +27,9 @@ import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; import org.apache.pinot.core.routing.ServerRouteInfo; -import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.transport.TableRouteInfo; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -59,9 +59,8 @@ public class ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl } } - private ImplicitHybridTableRouteInfo getImplicitHybridTableRouteInfo(String tableName) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); + private TableRouteInfo getImplicitHybridTableRouteInfo(String tableName) { + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); BrokerRequestPair brokerRequestPair = getBrokerRequestPair(tableName, routeInfo.hasOffline(), routeInfo.hasRealtime(), routeInfo.getOfflineTableName(), routeInfo.getRealtimeTableName()); @@ -73,7 +72,7 @@ public class ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl private void assertTableRoute(String tableName, Map<String, Set<String>> expectedOfflineRoutingTable, Map<String, Set<String>> expectedRealtimeRoutingTable, boolean isOfflineExpected, boolean isRealtimeExpected) { - ImplicitHybridTableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName); + TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName); // If a routing table for offline table is expected, then compare it with the expected routing table. if (isOfflineExpected) { @@ -281,7 +280,7 @@ public class ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl private void assertEqualsTableRouteInfoGetTableRouteResult(String tableName, Map<String, Set<String>> expectedOfflineRoutingTable, Map<String, Set<String>> expectedRealtimeRoutingTable, boolean isOfflineExpected, boolean isRealtimeExpected) { - ImplicitHybridTableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName); + TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName); GetTableRouteResult expectedTableRoute = getTableRouting(tableName, _routingManager); if (isOfflineExpected) { @@ -332,7 +331,7 @@ public class ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl @Test(dataProvider = "routeNotExistsProvider") void testTableRoutingForRouteNotExists(String tableName) { - ImplicitHybridTableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName); + TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName); GetTableRouteResult expectedTableRoute = getTableRouting(tableName, _routingManager); assertNull(expectedTableRoute._offlineRoutingTable); @@ -348,7 +347,7 @@ public class ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl @Test(dataProvider = "partiallyDisabledTableAndRouteProvider") void testTableRoutingForPartiallyDisabledTable(String tableName, Map<String, Set<String>> expectedOfflineRoutingTable, Map<String, Set<String>> expectedRealtimeRoutingTable) { - ImplicitHybridTableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName); + TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName); GetTableRouteResult expectedTableRoute = getTableRouting(tableName, _routingManager); if (expectedOfflineRoutingTable == null) { @@ -376,7 +375,7 @@ public class ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl @Test(dataProvider = "disabledTableProvider") void testTableRoutingForDisabledTable(String tableName) { - ImplicitHybridTableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName); + TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName); GetTableRouteResult expectedTableRoute = getTableRouting(tableName, _routingManager); if (expectedTableRoute._offlineTableDisabled) { diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java index 2a5e046f49..ddd34ec9da 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java @@ -21,7 +21,7 @@ package org.apache.pinot.query.routing.table; import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.QueryProcessingException; -import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo; +import org.apache.pinot.core.transport.TableRouteInfo; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.exception.QueryErrorCode; @@ -41,26 +41,21 @@ import static org.testng.Assert.assertTrue; public class ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTableRouteTest { @Test(dataProvider = "offlineTableProvider") public void testOfflineTable(String parameter) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); - + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); assertTrue(routeInfo.isExists(), "The table should exist"); assertTrue(routeInfo.isOffline(), "The table should be offline"); } @Test(dataProvider = "realtimeTableProvider") public void testRealtimeTable(String parameter) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); - + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); assertTrue(routeInfo.isExists(), "The table should exist"); assertTrue(routeInfo.isRealtime(), "The table should be realtime"); } @Test(dataProvider = "hybridTableProvider") public void testHybridTable(String parameter) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); assertTrue(routeInfo.isExists(), "The table should exist"); assertTrue(routeInfo.isHybrid(), "The table should be hybrid"); @@ -71,62 +66,48 @@ public class ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable */ @Test public void testWithNoTimeBoundary() { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo("b", _tableCache, _routingManager); - + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo("b", _tableCache, _routingManager); assertTrue(routeInfo.isExists(), "The table should exist"); assertTrue(routeInfo.isRealtime(), "The table should be realtime"); } @Test(dataProvider = "nonExistentTableProvider") public void testNonExistentTableName(String parameter) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); - + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); assertFalse(routeInfo.isExists(), "The table should not exist"); } @Test(dataProvider = "routeExistsProvider") public void testRouteExists(String parameter) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); - + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); assertTrue(routeInfo.isExists(), "The table should exist"); assertTrue(routeInfo.isRouteExists(), "The table should have route"); } @Test(dataProvider = "routeNotExistsProvider") public void testRouteNotExists(String parameter) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); - + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); assertTrue(routeInfo.isExists(), "The table should exist"); assertFalse(routeInfo.isRouteExists(), "The table should not have route"); } @Test(dataProvider = "notDisabledTableProvider") public void testNotDisabledTable(String parameter) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); - + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); assertTrue(routeInfo.isExists(), "The table should exist"); assertFalse(routeInfo.isDisabled(), "The table should not be disabled"); } @Test(dataProvider = "partiallyDisabledTableProvider") public void testPartiallyDisabledTable(String parameter) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); - + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); assertTrue(routeInfo.isExists(), "The table should exist"); assertFalse(routeInfo.isDisabled(), "The table should be disabled"); } @Test(dataProvider = "disabledTableProvider") public void testDisabledTable(String parameter) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); - + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, _routingManager); assertTrue(routeInfo.isExists(), "The table should exist"); assertTrue(routeInfo.isDisabled(), "The table should not have route"); } @@ -158,10 +139,8 @@ public class ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable /** * Similar if offlineTableName is not null and there is a route in the routing routeInfo. Same for * realtimeTableName. - * @param routeInfo - * @return */ - boolean similar(ImplicitHybridTableRouteInfo routeInfo) { + boolean similar(TableRouteInfo routeInfo) { boolean isEquals = true; if (_offlineTableName != null) { @@ -272,13 +251,11 @@ public class ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable /** * This test checks tables that have a table config and an entry in routing manager. * It makes sure that getTableNameAndConfig() behaves the same way as ImplicitTableRouteComputer. - * @param tableName */ @Test(dataProvider = "tableNameAndConfigSuccessProvider") public void testTableNameAndConfigSuccess(String tableName) { TableNameAndConfig tableNameAndConfig = getTableNameAndConfig(tableName); - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); assertTrue(tableNameAndConfig.similar(routeInfo), "The table name and config should match the hybrid table"); } @@ -303,13 +280,11 @@ public class ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable /** * This test checks tables that do not have a table config or an entry in routing manager. - * @param tableName */ @Test(dataProvider = "tableNameAndConfigFailureProvider") public void testTableNameAndConfigFailure(String tableName) { TableNameAndConfig tableNameAndConfig = getTableNameAndConfig(tableName); - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); // getTableNameAndConfig() returns an error as a BrokerResponse with the right error code. assertNotNull(tableNameAndConfig._brokerResponse); @@ -373,12 +348,10 @@ public class ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable /** * If a table is not disabled, then checkTableDisabled() should return null. * ImplicitTableRouteComputer should not be disabled. - * @param tableName */ @Test(dataProvider = "notDisabledTableProvider") public void testNotDisabledWithCheckDisabled(String tableName) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); ExceptionOrResponse exceptionOrResponse = checkTableDisabled(routeInfo.hasOffline() && routeInfo.isOfflineTableDisabled(), @@ -393,12 +366,10 @@ public class ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable /** * In a hybrid table, if one of the tables is disabled, then checkTableDisabled() should return an exception. * ImplicitTableRouteComputer should not be disabled. - * @param tableName */ @Test(dataProvider = "partiallyDisabledTableProvider") public void testPartiallyDisabledWithCheckDisabled(String tableName) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); ExceptionOrResponse exceptionOrResponse = checkTableDisabled(routeInfo.hasOffline() && routeInfo.isOfflineTableDisabled(), @@ -416,12 +387,10 @@ public class ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable * If a table is disabled, then checkTableDisabled() should return a broker response with error code * TABLE_IS_DISABLED. * ImplicitTableRouteComputer should be disabled. - * @param tableName */ @Test(dataProvider = "disabledTableProvider") public void testDisabledWithCheckDisabled(String tableName) { - ImplicitHybridTableRouteInfo routeInfo = - _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); + TableRouteInfo routeInfo = _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, _routingManager); ExceptionOrResponse exceptionOrResponse = checkTableDisabled(routeInfo.hasOffline() && routeInfo.isOfflineTableDisabled(), diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java index 464ff805d9..7e0ce2d2fe 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java @@ -84,7 +84,6 @@ public class MinTimeBoundaryStrategyTest { return new Object[][]{ {timeBoundaryInfoMap, List.of("table3_OFFLINE"), "table3_OFFLINE"}, - {timeBoundaryInfoMap, List.of("Invalid_OFFLINE"), "Invalid_OFFLINE"}, {timeBoundaryInfoMap, List.of("table2_OFFLINE", "table3_OFFLINE"), "table2_OFFLINE"}, {timeBoundaryInfoMap, List.of("table3_OFFLINE", "table2_OFFLINE", "table4_OFFLINE"), "table2_OFFLINE"}, {timeBoundaryInfoMap, List.of(), "empty_includedTables_OFFLINE"} @@ -102,8 +101,8 @@ public class MinTimeBoundaryStrategyTest { private void testComputeTimeBoundary(Map<String, TimeBoundaryInfo> timeBoundaryInfoMap, String expectedTableName, Map<String, Object> parameters) { setupMocks(timeBoundaryInfoMap); - TimeBoundaryInfo timeBoundaryInfo = _minTimeBoundaryStrategy.computeTimeBoundary( - createLogicalTableConfig(parameters), _mockTableCache, _mockRoutingManager); + _minTimeBoundaryStrategy.init(createLogicalTableConfig(parameters), _mockTableCache); + TimeBoundaryInfo timeBoundaryInfo = _minTimeBoundaryStrategy.computeTimeBoundary(_mockRoutingManager); assertSame(timeBoundaryInfo, timeBoundaryInfoMap.get(expectedTableName)); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index 8bd40d51c0..b0eb2e12d0 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -36,10 +36,12 @@ import org.apache.pinot.common.request.Expression; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.common.request.TableSegmentsInfo; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.LogicalTableContext; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.query.request.ServerQueryRequest; @@ -112,8 +114,14 @@ public class ServerPlanRequestUtils { // 2. Convert PinotQuery into InstanceRequest list (one for each physical table) PinotQuery pinotQuery = serverContext.getPinotQuery(); pinotQuery.setExplain(explain); - List<InstanceRequest> instanceRequests = - constructServerQueryRequests(executionContext, pinotQuery, leafQueryExecutor.getInstanceDataManager()); + List<InstanceRequest> instanceRequests; + if (executionContext.getWorkerMetadata().getLogicalTableSegmentsMap() != null) { + instanceRequests = constructLogicalTableServerQueryRequests(executionContext, pinotQuery, + leafQueryExecutor.getInstanceDataManager()); + } else { + instanceRequests = + constructServerQueryRequests(executionContext, pinotQuery, leafQueryExecutor.getInstanceDataManager()); + } int numRequests = instanceRequests.size(); List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(numRequests); for (InstanceRequest instanceRequest : instanceRequests) { @@ -163,16 +171,20 @@ public class ServerPlanRequestUtils { TableDataManager tableDataManager = instanceDataManager.getTableDataManager(offlineTableName); Preconditions.checkState(tableDataManager != null, "Failed to find data manager for table: %s", offlineTableName); - return List.of(compileInstanceRequest(executionContext, pinotQuery, timeBoundary, TableType.OFFLINE, segments, - tableDataManager)); + Pair<TableConfig, Schema> tableConfigAndSchema = tableDataManager.getCachedTableConfigAndSchema(); + return List.of(compileInstanceRequest(executionContext, pinotQuery, timeBoundary, TableType.OFFLINE, + tableDataManager.getTableName(), tableConfigAndSchema.getLeft(), tableConfigAndSchema.getRight(), segments, + null)); } else { assert tableType.equals(TableType.REALTIME.name()); String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName); TableDataManager tableDataManager = instanceDataManager.getTableDataManager(realtimeTableName); Preconditions.checkState(tableDataManager != null, "Failed to find data manager for table: %s", realtimeTableName); - return List.of(compileInstanceRequest(executionContext, pinotQuery, timeBoundary, TableType.REALTIME, segments, - tableDataManager)); + Pair<TableConfig, Schema> tableConfigAndSchema = tableDataManager.getCachedTableConfigAndSchema(); + return List.of(compileInstanceRequest(executionContext, pinotQuery, timeBoundary, TableType.REALTIME, + tableDataManager.getTableName(), tableConfigAndSchema.getLeft(), tableConfigAndSchema.getRight(), segments, + null)); } } else { assert numRequests == 2; @@ -183,16 +195,21 @@ public class ServerPlanRequestUtils { TableDataManager offlineTableDataManager = instanceDataManager.getTableDataManager(offlineTableName); Preconditions.checkState(offlineTableDataManager != null, "Failed to find data manager for table: %s", offlineTableName); + Pair<TableConfig, Schema> offlineTableConfigAndSchema = offlineTableDataManager.getCachedTableConfigAndSchema(); String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName); TableDataManager realtimeTableDataManager = instanceDataManager.getTableDataManager(realtimeTableName); Preconditions.checkState(realtimeTableDataManager != null, "Failed to find data manager for table: %s", realtimeTableName); + Pair<TableConfig, Schema> realtimeTableConfigAndSchema = + realtimeTableDataManager.getCachedTableConfigAndSchema(); // NOTE: Make a deep copy of PinotQuery for OFFLINE request. return List.of( compileInstanceRequest(executionContext, new PinotQuery(pinotQuery), timeBoundary, TableType.OFFLINE, - offlineSegments, offlineTableDataManager), - compileInstanceRequest(executionContext, pinotQuery, timeBoundary, TableType.REALTIME, realtimeSegments, - realtimeTableDataManager)); + offlineTableDataManager.getTableName(), offlineTableConfigAndSchema.getLeft(), + offlineTableConfigAndSchema.getRight(), offlineSegments, null), + compileInstanceRequest(executionContext, pinotQuery, timeBoundary, TableType.REALTIME, + realtimeTableDataManager.getTableName(), realtimeTableConfigAndSchema.getLeft(), + realtimeTableConfigAndSchema.getRight(), realtimeSegments, null)); } } @@ -200,13 +217,16 @@ public class ServerPlanRequestUtils { * Convert {@link PinotQuery} into an {@link InstanceRequest}. */ private static InstanceRequest compileInstanceRequest(OpChainExecutionContext executionContext, PinotQuery pinotQuery, - @Nullable TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList, - TableDataManager tableDataManager) { + @Nullable TimeBoundaryInfo timeBoundaryInfo, TableType tableType, + String tableNameWithType, TableConfig tableConfig, Schema schema, @Nullable List<String> segmentList, + @Nullable List<TableSegmentsInfo> tableRouteInfoList) { + Preconditions.checkArgument(segmentList == null || tableRouteInfoList == null, + "Either segmentList OR tableRouteInfoList should be set"); + // Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing. long requestId = (executionContext.getRequestId() << 16) + ((long) executionContext.getStageId() << 8) + ( tableType == TableType.REALTIME ? 1 : 0); // 1. Modify the PinotQuery - String tableNameWithType = tableDataManager.getTableName(); pinotQuery.getDataSource().setTableName(tableNameWithType); if (timeBoundaryInfo != null) { attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType == TableType.OFFLINE); @@ -214,8 +234,7 @@ public class ServerPlanRequestUtils { for (QueryRewriter queryRewriter : QUERY_REWRITERS) { pinotQuery = queryRewriter.rewrite(pinotQuery); } - Pair<TableConfig, Schema> tableConfigAndSchema = tableDataManager.getCachedTableConfigAndSchema(); - QUERY_OPTIMIZER.optimize(pinotQuery, tableConfigAndSchema.getLeft(), tableConfigAndSchema.getRight()); + QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema); // 2. Update query options according to requestMetadataMap updateQueryOptions(pinotQuery, executionContext); @@ -233,7 +252,17 @@ public class ServerPlanRequestUtils { instanceRequest.setCid(QueryThreadContext.getCid()); instanceRequest.setBrokerId("unknown"); instanceRequest.setEnableTrace(executionContext.isTraceEnabled()); - instanceRequest.setSearchSegments(segmentList); + /* + * If segmentList is not null, it means that the query is for a single table and we can directly set the segments. + * If segmentList is null, it means that the query is for a logical table and we need to set TableSegmentInfoList + * + * Either one of segmentList or tableRouteInfoList has to be set, but not both. + */ + if (segmentList != null) { + instanceRequest.setSearchSegments(segmentList); + } else { + instanceRequest.setTableSegmentsInfoList(tableRouteInfoList); + } instanceRequest.setQuery(brokerRequest); return instanceRequest; @@ -386,4 +415,69 @@ public class ServerPlanRequestUtils { } return expressions; } + + private static List<InstanceRequest> constructLogicalTableServerQueryRequests( + OpChainExecutionContext executionContext, PinotQuery pinotQuery, InstanceDataManager instanceDataManager) { + StageMetadata stageMetadata = executionContext.getStageMetadata(); + String logicalTableName = stageMetadata.getTableName(); + LogicalTableContext logicalTableContext = instanceDataManager.getLogicalTableContext(logicalTableName); + Preconditions.checkNotNull(logicalTableContext, + "LogicalTableManager not found for logical table name: " + logicalTableName); + + Map<String, List<String>> logicalTableSegmentsMap = + executionContext.getWorkerMetadata().getLogicalTableSegmentsMap(); + List<TableSegmentsInfo> offlineTableRouteInfoList = new ArrayList<>(); + List<TableSegmentsInfo> realtimeTableRouteInfoList = new ArrayList<>(); + + Preconditions.checkNotNull(logicalTableSegmentsMap); + for (Map.Entry<String, List<String>> entry: logicalTableSegmentsMap.entrySet()) { + String physicalTableName = entry.getKey(); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(physicalTableName); + TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo(); + tableSegmentsInfo.setTableName(physicalTableName); + tableSegmentsInfo.setSegments(entry.getValue()); + if (tableType == TableType.REALTIME) { + realtimeTableRouteInfoList.add(tableSegmentsInfo); + } else { + offlineTableRouteInfoList.add(tableSegmentsInfo); + } + } + + TimeBoundaryInfo timeBoundaryInfo = stageMetadata.getTimeBoundary(); + + if (offlineTableRouteInfoList.isEmpty() || realtimeTableRouteInfoList.isEmpty()) { + List<TableSegmentsInfo> routeInfoList = + offlineTableRouteInfoList.isEmpty() ? realtimeTableRouteInfoList : offlineTableRouteInfoList; + String tableType = offlineTableRouteInfoList.isEmpty() ? TableType.REALTIME.name() : TableType.OFFLINE.name(); + if (tableType.equals(TableType.OFFLINE.name())) { + Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig()); + String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(logicalTableName); + return List.of( + compileInstanceRequest(executionContext, pinotQuery, timeBoundaryInfo, TableType.OFFLINE, offlineTableName, + logicalTableContext.getRefOfflineTableConfig(), logicalTableContext.getLogicalTableSchema(), null, + routeInfoList)); + } else { + Preconditions.checkNotNull(logicalTableContext.getRefRealtimeTableConfig()); + String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(logicalTableName); + return List.of( + compileInstanceRequest(executionContext, pinotQuery, timeBoundaryInfo, TableType.REALTIME, + realtimeTableName, logicalTableContext.getRefRealtimeTableConfig(), + logicalTableContext.getLogicalTableSchema(), null, routeInfoList)); + } + } else { + Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig()); + Preconditions.checkNotNull(logicalTableContext.getRefRealtimeTableConfig()); + String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(logicalTableName); + String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(logicalTableName); + PinotQuery offlinePinotQuery = pinotQuery.deepCopy(); + PinotQuery realtimePinotQuery = pinotQuery.deepCopy(); + return List.of( + compileInstanceRequest(executionContext, offlinePinotQuery, timeBoundaryInfo, TableType.OFFLINE, + offlineTableName, logicalTableContext.getRefOfflineTableConfig(), + logicalTableContext.getLogicalTableSchema(), null, offlineTableRouteInfoList), + compileInstanceRequest(executionContext, realtimePinotQuery, timeBoundaryInfo, TableType.REALTIME, + realtimeTableName, logicalTableContext.getRefRealtimeTableConfig(), + logicalTableContext.getLogicalTableSchema(), null, realtimeTableRouteInfoList)); + } + } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 292833ff85..c71321b6d1 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -47,6 +47,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.LogicalTableContext; import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; @@ -64,6 +65,7 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.plugin.PluginManager; @@ -517,4 +519,42 @@ public class HelixInstanceDataManager implements InstanceDataManager { }); } } + + // TODO: LogicalTableContext has to be cached. https://github.com/apache/pinot/issues/15859 + @Nullable + @Override + public LogicalTableContext getLogicalTableContext(String logicalTableName) { + Schema schema = ZKMetadataProvider.getSchema(getPropertyStore(), logicalTableName); + if (schema == null) { + LOGGER.warn("Failed to find schema for logical table: {}, skipping", logicalTableName); + return null; + } + LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(getPropertyStore(), + logicalTableName); + if (logicalTableConfig == null) { + LOGGER.warn("Failed to find logical table config for logical table: {}, skipping", logicalTableName); + return null; + } + + TableConfig offlineTableConfig = null; + if (logicalTableConfig.getRefOfflineTableName() != null) { + offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(getPropertyStore(), + logicalTableConfig.getRefOfflineTableName()); + if (offlineTableConfig == null) { + LOGGER.warn("Failed to find offline table config for logical table: {}, skipping", logicalTableName); + return null; + } + } + + TableConfig realtimeTableConfig = null; + if (logicalTableConfig.getRefRealtimeTableName() != null) { + realtimeTableConfig = ZKMetadataProvider.getRealtimeTableConfig(getPropertyStore(), + logicalTableConfig.getRefRealtimeTableName()); + if (realtimeTableConfig == null) { + LOGGER.warn("Failed to find realtime table config for logical table: {}, skipping", logicalTableName); + return null; + } + } + return new LogicalTableContext(logicalTableConfig, schema, offlineTableConfig, realtimeTableConfig); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org