This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 08769d2d7b Logical table CRUD operations. (#15515) 08769d2d7b is described below commit 08769d2d7b678119ef9622b95cc5307ef7663dcf Author: Abhishek Bafna <aba...@startree.ai> AuthorDate: Tue May 6 14:47:46 2025 +0530 Logical table CRUD operations. (#15515) * Logical table CRUD operations. * Logical tables znRecord update, validations and more unit tests. * Update test comment messages * Code refactoring and more unit tests. * more unit tests. * Add logical table to table cache. * Code refactoring to handle code duplication. * Store logical table json using jackson. * remove sout statement. * Code refactoring - physical tables config as map. * Fix checkstyle violations. * Review comments addressed. * Code refactoring and address review comments. * Addressing review comments. * Refactor LogicalTable to LogicalTableConfig. * Refactor LogicalTable to LogicalTableConfig. * Refactor LogicalTable to LogicalTableConfig. * more unit tests. --------- Co-authored-by: abhishekbafna <abhishek.ba...@startree.ai> --- .../BaseSingleStageBrokerRequestHandler.java | 7 +- .../pinot/common/config/provider/TableCache.java | 198 ++++++++++++- .../pinot/common/metadata/ZKMetadataProvider.java | 55 +++- .../pinot/common/utils/LogicalTableUtils.java | 118 ++++++++ .../api/resources/PinotLogicalTableResource.java | 263 +++++++++++++++++ .../helix/core/PinotHelixResourceManager.java | 59 ++++ .../PinotAdminUserLogicalTableResourceTest.java | 51 ++++ .../resources/PinotLogicalTableResourceTest.java | 311 +++++++++++++++++++++ ...inotUserWithAccessLogicalTableResourceTest.java | 173 ++++++++++++ .../pinot/controller/helix/ControllerTest.java | 52 ++++ .../pinot/controller/helix/TableCacheTest.java | 97 +++++++ .../provider/LogicalTableConfigChangeListener.java | 31 ++ .../spi/config/provider/PinotConfigProvider.java | 16 ++ .../apache/pinot/spi/data/LogicalTableConfig.java | 118 ++++++++ .../apache/pinot/spi/data/PhysicalTableConfig.java | 29 ++ .../utils/builder/ControllerRequestURLBuilder.java | 20 ++ .../utils/builder/LogicalTableConfigBuilder.java | 53 ++++ 17 files changed, 1646 insertions(+), 5 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 97510585b8..3f923ba0ef 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -1178,10 +1178,11 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ @VisibleForTesting static String getActualTableName(String tableName, TableCache tableCache) { String actualTableName = tableCache.getActualTableName(tableName); - if (actualTableName != null) { - return actualTableName; + // If actual table name is not found for physical table, check in the logical tables + if (actualTableName == null) { + actualTableName = tableCache.getActualLogicalTableName(tableName); } - return tableName; + return actualTableName != null ? actualTableName : tableName; } /** diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java index 17d953abec..726df76930 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java @@ -25,9 +25,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -37,8 +39,10 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.utils.LogicalTableUtils; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; +import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener; import org.apache.pinot.spi.config.provider.PinotConfigProvider; import org.apache.pinot.spi.config.provider.SchemaChangeListener; import org.apache.pinot.spi.config.provider.TableConfigChangeListener; @@ -46,6 +50,7 @@ import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn; import org.apache.pinot.spi.utils.TimestampIndexUtils; @@ -66,6 +71,8 @@ public class TableCache implements PinotConfigProvider { private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/"; private static final String SCHEMA_PARENT_PATH = "/SCHEMAS"; private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/"; + private static final String LOGICAL_TABLE_PARENT_PATH = "/LOGICAL/TABLE"; + private static final String LOGICAL_TABLE_PATH_PREFIX = "/LOGICAL/TABLE/"; private static final String OFFLINE_TABLE_SUFFIX = "_OFFLINE"; private static final String REALTIME_TABLE_SUFFIX = "_REALTIME"; private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = OFFLINE_TABLE_SUFFIX.toLowerCase(); @@ -74,6 +81,7 @@ public class TableCache implements PinotConfigProvider { // NOTE: No need to use concurrent set because it is always accessed within the ZK change listener lock private final Set<TableConfigChangeListener> _tableConfigChangeListeners = new HashSet<>(); private final Set<SchemaChangeListener> _schemaChangeListeners = new HashSet<>(); + private final Set<LogicalTableConfigChangeListener> _logicalTableConfigChangeListeners = new HashSet<>(); private final ZkHelixPropertyStore<ZNRecord> _propertyStore; private final boolean _ignoreCase; @@ -89,6 +97,14 @@ public class TableCache implements PinotConfigProvider { // Key is schema name, value is schema info private final Map<String, SchemaInfo> _schemaInfoMap = new ConcurrentHashMap<>(); + private final ZkLogicalTableConfigChangeListener + _zkLogicalTableConfigChangeListener = new ZkLogicalTableConfigChangeListener(); + // Key is table name, value is logical table info + private final Map<String, LogicalTableConfigInfo> _logicalTableConfigInfoMap = new ConcurrentHashMap<>(); + // Key is lower case logical table name, value is actual logical table name + // For case-insensitive mode only + private final Map<String, String> _logicalTableNameMap = new ConcurrentHashMap<>(); + public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean ignoreCase) { _propertyStore = propertyStore; _ignoreCase = ignoreCase; @@ -121,6 +137,19 @@ public class TableCache implements PinotConfigProvider { } } + synchronized (_zkLogicalTableConfigChangeListener) { + // Subscribe child changes before reading the data to avoid missing changes + _propertyStore.subscribeChildChanges(LOGICAL_TABLE_PARENT_PATH, _zkLogicalTableConfigChangeListener); + + List<String> tables = _propertyStore.getChildNames(LOGICAL_TABLE_PARENT_PATH, AccessOption.PERSISTENT); + if (CollectionUtils.isNotEmpty(tables)) { + List<String> pathsToAdd = tables.stream() + .map(rawTableName -> LOGICAL_TABLE_PATH_PREFIX + rawTableName) + .collect(Collectors.toList()); + addLogicalTableConfigs(pathsToAdd); + } + } + LOGGER.info("Initialized TableCache with IgnoreCase: {}", ignoreCase); } @@ -144,6 +173,18 @@ public class TableCache implements PinotConfigProvider { } } + /** + * Returns the actual logical table name for the given table name, or {@code null} if table does not exist. + * @param logicalTableName Logical table name + * @return Actual logical table name + */ + @Nullable + public String getActualLogicalTableName(String logicalTableName) { + return _ignoreCase + ? _logicalTableNameMap.get(logicalTableName.toLowerCase()) + : _logicalTableNameMap.get(logicalTableName); + } + /** * Returns a map from table name to actual table name. For case-insensitive case, the keys of the map are in lower * case. @@ -152,6 +193,15 @@ public class TableCache implements PinotConfigProvider { return _tableNameMap; } + /** + * Returns a map from logical table name to actual logical table name. For case-insensitive case, the keys of the map + * are in lower case. + * @return Map from logical table name to actual logical table name + */ + public Map<String, String> getLogicalTableNameMap() { + return _logicalTableNameMap; + } + /** * Get all dimension table names. * @return List of dimension table names @@ -204,6 +254,14 @@ public class TableCache implements PinotConfigProvider { return tableConfigInfo != null ? tableConfigInfo._tableConfig : null; } + @Nullable + @Override + public LogicalTableConfig getLogicalTableConfig(String logicalTableName) { + logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : logicalTableName; + LogicalTableConfigInfo logicalTableConfigInfo = _logicalTableConfigInfoMap.get(logicalTableName); + return logicalTableConfigInfo != null ? logicalTableConfigInfo._logicalTableConfig : null; + } + @Override public boolean registerTableConfigChangeListener(TableConfigChangeListener tableConfigChangeListener) { synchronized (_zkTableConfigChangeListener) { @@ -216,15 +274,33 @@ public class TableCache implements PinotConfigProvider { } /** - * Returns the schema for the given table, or {@code null} if it does not exist. + * Returns the schema for the given logical or physical table, or {@code null} if it does not exist. */ @Nullable @Override public Schema getSchema(String rawTableName) { + if (_schemaInfoMap.containsKey(rawTableName)) { + return getPhysicalTableSchema(rawTableName); + } else { + return getLogicalTableSchema(rawTableName); + } + } + + private Schema getPhysicalTableSchema(String rawTableName) { SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName); return schemaInfo != null ? schemaInfo._schema : null; } + @Nullable + private Schema getLogicalTableSchema(String logicalTableName) { + LogicalTableConfig logicalTableConfig = getLogicalTableConfig(logicalTableName); + if (logicalTableConfig == null) { + return null; + } + Optional<String> physicalTableName = logicalTableConfig.getPhysicalTableConfigMap().keySet().stream().findFirst(); + return getPhysicalTableSchema(TableNameBuilder.extractRawTableName(physicalTableName.orElse(null))); + } + @Override public boolean registerSchemaChangeListener(SchemaChangeListener schemaChangeListener) { synchronized (_zkSchemaChangeListener) { @@ -253,6 +329,23 @@ public class TableCache implements PinotConfigProvider { } } + private void addLogicalTableConfigs(List<String> paths) { + // Subscribe data changes before reading the data to avoid missing changes + for (String path : paths) { + _propertyStore.subscribeDataChanges(path, _zkLogicalTableConfigChangeListener); + } + List<ZNRecord> znRecords = _propertyStore.get(paths, null, AccessOption.PERSISTENT); + for (ZNRecord znRecord : znRecords) { + if (znRecord != null) { + try { + putLogicalTableConfig(znRecord); + } catch (Exception e) { + LOGGER.error("Caught exception while adding logical table for ZNRecord: {}", znRecord.getId(), e); + } + } + } + } + private void putTableConfig(ZNRecord znRecord) throws IOException { TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord); @@ -268,6 +361,19 @@ public class TableCache implements PinotConfigProvider { } } + private void putLogicalTableConfig(ZNRecord znRecord) + throws IOException { + LogicalTableConfig logicalTableConfig = LogicalTableUtils.fromZNRecord(znRecord); + String logicalTableName = logicalTableConfig.getTableName(); + if (_ignoreCase) { + _logicalTableNameMap.put(logicalTableName.toLowerCase(), logicalTableName); + _logicalTableConfigInfoMap.put(logicalTableName.toLowerCase(), new LogicalTableConfigInfo(logicalTableConfig)); + } else { + _logicalTableNameMap.put(logicalTableName, logicalTableName); + _logicalTableConfigInfoMap.put(logicalTableName, new LogicalTableConfigInfo(logicalTableConfig)); + } + } + private void removeTableConfig(String path) { _propertyStore.unsubscribeDataChanges(path, _zkTableConfigChangeListener); String tableNameWithType = path.substring(TABLE_CONFIG_PATH_PREFIX.length()); @@ -299,6 +405,14 @@ public class TableCache implements PinotConfigProvider { } } + private void removeLogicalTableConfig(String path) { + _propertyStore.unsubscribeDataChanges(path, _zkLogicalTableConfigChangeListener); + String logicalTableName = path.substring(LOGICAL_TABLE_PATH_PREFIX.length()); + logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : logicalTableName; + _logicalTableConfigInfoMap.remove(logicalTableName); + _logicalTableNameMap.remove(logicalTableName); + } + private void addSchemas(List<String> paths) { // Subscribe data changes before reading the data to avoid missing changes for (String path : paths) { @@ -365,6 +479,15 @@ public class TableCache implements PinotConfigProvider { } } + private void notifyLogicalTableConfigChangeListeners() { + if (!_logicalTableConfigChangeListeners.isEmpty()) { + List<LogicalTableConfig> logicalTableConfigs = getLogicalTableConfigs(); + for (LogicalTableConfigChangeListener listener : _logicalTableConfigChangeListeners) { + listener.onChange(logicalTableConfigs); + } + } + } + private List<TableConfig> getTableConfigs() { List<TableConfig> tableConfigs = new ArrayList<>(_tableConfigInfoMap.size()); for (TableConfigInfo tableConfigInfo : _tableConfigInfoMap.values()) { @@ -373,6 +496,10 @@ public class TableCache implements PinotConfigProvider { return tableConfigs; } + public List<LogicalTableConfig> getLogicalTableConfigs() { + return _logicalTableConfigInfoMap.values().stream().map(o -> o._logicalTableConfig).collect(Collectors.toList()); + } + private void notifySchemaChangeListeners() { if (!_schemaChangeListeners.isEmpty()) { List<Schema> schemas = getSchemas(); @@ -390,6 +517,23 @@ public class TableCache implements PinotConfigProvider { return schemas; } + public boolean isLogicalTable(String logicalTableName) { + logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : logicalTableName; + return _logicalTableConfigInfoMap.containsKey(logicalTableName); + } + + @Override + public boolean registerLogicalTableConfigChangeListener( + LogicalTableConfigChangeListener logicalTableConfigChangeListener) { + synchronized (_zkLogicalTableConfigChangeListener) { + boolean added = _logicalTableConfigChangeListeners.add(logicalTableConfigChangeListener); + if (added) { + logicalTableConfigChangeListener.onChange(getLogicalTableConfigs()); + } + return added; + } + } + private class ZkTableConfigChangeListener implements IZkChildListener, IZkDataListener { @Override @@ -476,6 +620,49 @@ public class TableCache implements PinotConfigProvider { } } + private class ZkLogicalTableConfigChangeListener implements IZkChildListener, IZkDataListener { + + @Override + public synchronized void handleChildChange(String path, List<String> logicalTableNames) { + if (CollectionUtils.isEmpty(logicalTableNames)) { + return; + } + + // Only process new added logical tables. Changed/removed logical tables are handled by other callbacks. + List<String> pathsToAdd = new ArrayList<>(); + for (String logicalTableName : logicalTableNames) { + if (!_logicalTableConfigInfoMap.containsKey(logicalTableName)) { + pathsToAdd.add(LOGICAL_TABLE_PATH_PREFIX + logicalTableName); + } + } + if (!pathsToAdd.isEmpty()) { + addLogicalTableConfigs(pathsToAdd); + } + notifyLogicalTableConfigChangeListeners(); + } + + @Override + public synchronized void handleDataChange(String path, Object data) { + if (data != null) { + ZNRecord znRecord = (ZNRecord) data; + try { + putLogicalTableConfig(znRecord); + } catch (Exception e) { + LOGGER.error("Caught exception while refreshing logical table for ZNRecord: {}", znRecord.getId(), e); + } + notifyLogicalTableConfigChangeListeners(); + } + } + + @Override + public synchronized void handleDataDeleted(String path) { + // NOTE: The path here is the absolute ZK path instead of the relative path to the property store. + String logicalTableName = path.substring(path.lastIndexOf('/') + 1); + removeLogicalTableConfig(LOGICAL_TABLE_PATH_PREFIX + logicalTableName); + notifyLogicalTableConfigChangeListeners(); + } + } + private static class TableConfigInfo { final TableConfig _tableConfig; final Map<Expression, Expression> _expressionOverrideMap; @@ -522,4 +709,13 @@ public class TableCache implements PinotConfigProvider { _columnNameMap = columnNameMap; } } + + private static class LogicalTableConfigInfo { + final LogicalTableConfig _logicalTableConfig; + // TODO : Add expression override map for logical table, issue #15607 + + private LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) { + _logicalTableConfig = logicalTableConfig; + } + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 096de78884..3025b1f5d9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -18,12 +18,15 @@ */ package org.apache.pinot.common.metadata; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -37,6 +40,7 @@ import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.LogicalTableUtils; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; @@ -45,6 +49,7 @@ import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.user.UserConfig; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; @@ -67,6 +72,7 @@ public class ZKMetadataProvider { private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS"; private static final String PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX = "/PAUSELESS_DEBUG_METADATA"; private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS"; + private static final String PROPERTYSTORE_LOGICAL_PREFIX = "/LOGICAL/TABLE"; private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX = "/INSTANCE_PARTITIONS"; private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX = "/CONFIGS/DATABASE"; private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE"; @@ -304,6 +310,10 @@ public class ZKMetadataProvider { return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, taskType, tableNameWithType); } + public static String constructPropertyStorePathForLogical(String tableName) { + return StringUtil.join("/", PROPERTYSTORE_LOGICAL_PREFIX, tableName); + } + public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource, String segmentName) { return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName), @@ -376,6 +386,7 @@ public class ZKMetadataProvider { return propertyStore.remove(constructPropertyStorePathForSegment(tableNameWithType, segmentName), AccessOption.PERSISTENT); } + public static boolean removePauselessDebugMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType) { String pauselessDebugMetadataPath = constructPropertyStorePathForPauselessDebugMetadata(tableNameWithType); @@ -385,7 +396,6 @@ public class ZKMetadataProvider { return true; } - @Nullable public static ZNRecord getZnRecord(ZkHelixPropertyStore<ZNRecord> propertyStore, String path) { Stat stat = new Stat(); @@ -809,4 +819,47 @@ public class ZKMetadataProvider { return result; } } + + public static void setLogicalTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, + LogicalTableConfig logicalTableConfig) { + try { + ZNRecord znRecord = LogicalTableUtils.toZNRecord(logicalTableConfig); + String path = constructPropertyStorePathForLogical(logicalTableConfig.getTableName()); + propertyStore.set(path, znRecord, AccessOption.PERSISTENT); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to convert logical table to ZNRecord", e); + } + } + + public static List<LogicalTableConfig> getAllLogicalTableConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) { + List<ZNRecord> znRecords = + propertyStore.getChildren(PROPERTYSTORE_LOGICAL_PREFIX, null, AccessOption.PERSISTENT, 0, 0); + if (znRecords != null) { + return znRecords.stream().map(znRecord -> { + try { + return LogicalTableUtils.fromZNRecord(znRecord); + } catch (IOException e) { + LOGGER.error("Caught exception while converting ZNRecord to LogicalTable: {}", znRecord.getId(), e); + return null; + } + }).filter(Objects::nonNull).collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + + public static LogicalTableConfig getLogicalTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, + String tableName) { + try { + ZNRecord logicalTableZNRecord = + propertyStore.get(constructPropertyStorePathForLogical(tableName), null, AccessOption.PERSISTENT); + if (logicalTableZNRecord == null) { + return null; + } + return LogicalTableUtils.fromZNRecord(logicalTableZNRecord); + } catch (Exception e) { + LOGGER.error("Caught exception while getting logical table: {}", tableName, e); + return null; + } + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java new file mode 100644 index 0000000000..0b5d715bd9 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.data.PhysicalTableConfig; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +public class LogicalTableUtils { + + private LogicalTableUtils() { + // Utility class + } + + public static LogicalTableConfig fromZNRecord(ZNRecord record) + throws IOException { + LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder() + .setTableName(record.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY)) + .setBrokerTenant(record.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY)); + + Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>(); + for (Map.Entry<String, String> entry : record.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY) + .entrySet()) { + String physicalTableName = entry.getKey(); + String physicalTableConfigJson = entry.getValue(); + physicalTableConfigMap.put(physicalTableName, + JsonUtils.stringToObject(physicalTableConfigJson, PhysicalTableConfig.class)); + } + builder.setPhysicalTableConfigMap(physicalTableConfigMap); + return builder.build(); + } + + public static ZNRecord toZNRecord(LogicalTableConfig logicalTableConfig) + throws JsonProcessingException { + Map<String, String> physicalTableConfigMap = new HashMap<>(); + for (Map.Entry<String, PhysicalTableConfig> entry : logicalTableConfig.getPhysicalTableConfigMap().entrySet()) { + String physicalTableName = entry.getKey(); + PhysicalTableConfig physicalTableConfig = entry.getValue(); + physicalTableConfigMap.put(physicalTableName, physicalTableConfig.toJsonString()); + } + + ZNRecord record = new ZNRecord(logicalTableConfig.getTableName()); + record.setSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY, logicalTableConfig.getTableName()); + record.setSimpleField(LogicalTableConfig.BROKER_TENANT_KEY, logicalTableConfig.getBrokerTenant()); + record.setMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY, physicalTableConfigMap); + return record; + } + + public static void validateLogicalTableName(LogicalTableConfig logicalTableConfig, List<String> allPhysicalTables, + Set<String> allBrokerTenantNames) { + String tableName = logicalTableConfig.getTableName(); + if (StringUtils.isEmpty(tableName)) { + throw new IllegalArgumentException("Invalid logical table name. Reason: 'tableName' should not be null or empty"); + } + + if (TableNameBuilder.isOfflineTableResource(tableName) || TableNameBuilder.isRealtimeTableResource(tableName)) { + throw new IllegalArgumentException( + "Invalid logical table name. Reason: 'tableName' should not end with _OFFLINE or _REALTIME"); + } + + if (logicalTableConfig.getPhysicalTableConfigMap() == null || logicalTableConfig.getPhysicalTableConfigMap() + .isEmpty()) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: 'physicalTableConfigMap' should not be null or empty"); + } + + for (Map.Entry<String, PhysicalTableConfig> entry : logicalTableConfig.getPhysicalTableConfigMap().entrySet()) { + String physicalTableName = entry.getKey(); + PhysicalTableConfig physicalTableConfig = entry.getValue(); + + // validate physical table exists + if (!allPhysicalTables.contains(physicalTableName)) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: '" + physicalTableName + "' should be one of the existing tables"); + } + // validate physical table config is not null + if (physicalTableConfig == null) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: 'physicalTableConfig' should not be null for physical table: " + + physicalTableName); + } + } + + // validate broker tenant + String brokerTenant = logicalTableConfig.getBrokerTenant(); + if (!allBrokerTenantNames.contains(brokerTenant)) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: '" + brokerTenant + "' should be one of the existing broker tenants"); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java new file mode 100644 index 0000000000..2e65ab9378 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.util.List; +import java.util.Map; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.arrow.util.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.exception.TableNotFoundException; +import org.apache.pinot.common.utils.DatabaseUtils; +import org.apache.pinot.common.utils.LogicalTableUtils; +import org.apache.pinot.controller.api.access.AccessControlFactory; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.api.exception.TableAlreadyExistsException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.core.auth.Actions; +import org.apache.pinot.core.auth.Authorize; +import org.apache.pinot.core.auth.ManualAuthorization; +import org.apache.pinot.core.auth.TargetType; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.utils.JsonUtils; +import org.glassfish.grizzly.http.server.Request; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.spi.utils.CommonConstants.DATABASE; +import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; + + +@Api(tags = "LogicalTable", authorizations = { + @Authorization(value = SWAGGER_AUTHORIZATION_KEY), @Authorization(value = DATABASE) +}) +@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = { + @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, + key = SWAGGER_AUTHORIZATION_KEY, + description = "The format of the key is ```\"Basic <token>\" or \"Bearer <token>\"```"), + @ApiKeyAuthDefinition(name = DATABASE, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE, + description = "Database context passed through http header. If no context is provided 'default' " + + "database context will be considered.") +})) +@Path("/") +public class PinotLogicalTableResource { + public static final Logger LOGGER = LoggerFactory.getLogger(PinotLogicalTableResource.class); + private static final String DEFAULT_BROKER_TENANT = "DefaultTenant"; + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + AccessControlFactory _accessControlFactory; + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/logicalTables") + @Authorize(targetType = TargetType.CLUSTER, paramName = "tableName", action = Actions.Cluster.GET_TABLE) + @ApiOperation(value = "List all logical table names", notes = "Lists all logical table names") + public List<String> listLogicalTableNames(@Context HttpHeaders headers) { + return _pinotHelixResourceManager.getAllLogicalTableNames(); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/logicalTables/{tableName}") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_TABLE_CONFIG) + @ApiOperation(value = "Get a logical table", notes = "Gets a logical table by name") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 404, message = "Logical table not found"), + @ApiResponse(code = 500, message = "Internal error") + }) + public String getLogicalTable( + @ApiParam(value = "Logical table name", required = true) @PathParam("tableName") String tableName, + @Context HttpHeaders headers) { + tableName = DatabaseUtils.translateTableName(tableName, headers); + LOGGER.info("Looking for logical table {}", tableName); + LogicalTableConfig logicalTableConfig = _pinotHelixResourceManager.getLogicalTableConfig(tableName); + if (logicalTableConfig == null) { + throw new ControllerApplicationException(LOGGER, "Logical table not found", Response.Status.NOT_FOUND); + } + return logicalTableConfig.toPrettyJsonString(); + } + + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/logicalTables") + @ApiOperation(value = "Add a new logical table", notes = "Adds a new logical table") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully created logical table"), @ApiResponse(code = 409, message = + "Logical table already exists"), @ApiResponse(code = 400, message = "Missing or invalid request body"), + @ApiResponse(code = 500, message = "Internal error") + }) + @ManualAuthorization + public SuccessResponse addLogicalTable( + String logicalTableJsonString, @Context HttpHeaders httpHeaders, + @Context Request request) { + Pair<LogicalTableConfig, Map<String, Object>> logicalTableConfigAndUnrecognizedProps = + getLogicalAndUnrecognizedPropertiesFromJson(logicalTableJsonString); + LogicalTableConfig logicalTableConfig = logicalTableConfigAndUnrecognizedProps.getLeft(); + String tableName = DatabaseUtils.translateTableName(logicalTableConfig.getTableName(), httpHeaders); + logicalTableConfig.setTableName(tableName); + + // validate permission + ResourceUtils.checkPermissionAndAccess(tableName, request, httpHeaders, AccessType.CREATE, + Actions.Table.CREATE_TABLE, _accessControlFactory, LOGGER); + + SuccessResponse successResponse = addLogicalTable(logicalTableConfig); + return new ConfigSuccessResponse(successResponse.getStatus(), logicalTableConfigAndUnrecognizedProps.getRight()); + } + + @PUT + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @Path("/logicalTables/{tableName}") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.UPDATE_TABLE_CONFIG) + @Authenticate(AccessType.UPDATE) + @ApiOperation(value = "Update a logical table", notes = "Updates a logical table") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully updated schema"), @ApiResponse(code = 404, message = "Schema " + + "not found"), @ApiResponse(code = 400, message = "Missing or invalid request body"), @ApiResponse(code = 500, + message = "Internal error") + }) + public SuccessResponse updateLogicalTable( + @ApiParam(value = "Name of the logical table", required = true) @PathParam("tableName") String tableName, + @Context HttpHeaders headers, String logicalTableJsonString) { + Pair<LogicalTableConfig, Map<String, Object>> logicalTableConfigAndUnrecognizedProps = + getLogicalAndUnrecognizedPropertiesFromJson(logicalTableJsonString); + LogicalTableConfig logicalTableConfig = logicalTableConfigAndUnrecognizedProps.getLeft(); + + Preconditions.checkArgument(logicalTableConfig.getTableName().equals(tableName), + "Logical table name in the request body should match the table name in the URL"); + + tableName = DatabaseUtils.translateTableName(tableName, headers); + logicalTableConfig.setTableName(tableName); + + SuccessResponse successResponse = updateLogicalTable(tableName, logicalTableConfig); + return new ConfigSuccessResponse(successResponse.getStatus(), logicalTableConfigAndUnrecognizedProps.getRight()); + } + + @DELETE + @Produces(MediaType.APPLICATION_JSON) + @Path("/logicalTables/{tableName}") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.DELETE_TABLE) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Delete a logical table", notes = "Deletes a logical table by name") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully deleted logical table"), @ApiResponse(code = 404, message = + "Logical table not found"), @ApiResponse(code = 500, message = "Error deleting logical table") + }) + public SuccessResponse deleteLogicalTable( + @ApiParam(value = "Logical table name", required = true) @PathParam("tableName") String tableName, + @Context HttpHeaders headers) { + tableName = DatabaseUtils.translateTableName(tableName, headers); + if (_pinotHelixResourceManager.deleteLogicalTable(tableName)) { + return new SuccessResponse(tableName + " logical table successfully deleted."); + } else { + throw new ControllerApplicationException(LOGGER, "Failed to delete logical table", + Response.Status.INTERNAL_SERVER_ERROR); + } + } + + private Pair<LogicalTableConfig, Map<String, Object>> getLogicalAndUnrecognizedPropertiesFromJson( + String logicalTableConfigJsonString) + throws ControllerApplicationException { + try { + return JsonUtils.stringToObjectAndUnrecognizedProperties(logicalTableConfigJsonString, LogicalTableConfig.class); + } catch (Exception e) { + String msg = + String.format("Invalid logical table json config: %s. Reason: %s", logicalTableConfigJsonString, + e.getMessage()); + throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST, e); + } + } + + private SuccessResponse addLogicalTable(LogicalTableConfig logicalTableConfig) { + String tableName = logicalTableConfig.getTableName(); + try { + if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) { + logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT); + } + + LogicalTableUtils.validateLogicalTableName( + logicalTableConfig, + _pinotHelixResourceManager.getAllTables(), + _pinotHelixResourceManager.getAllBrokerTenantNames() + ); + _pinotHelixResourceManager.addLogicalTable(logicalTableConfig); + return new SuccessResponse(tableName + " logical table successfully added."); + } catch (TableAlreadyExistsException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT, e); + } catch (IllegalArgumentException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST, e); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, + "Failed to add new logical table " + tableName + ". Reason: " + e.getMessage(), + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + private SuccessResponse updateLogicalTable(String tableName, LogicalTableConfig logicalTableConfig) { + try { + if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) { + logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT); + } + + LogicalTableUtils.validateLogicalTableName( + logicalTableConfig, + _pinotHelixResourceManager.getAllTables(), + _pinotHelixResourceManager.getAllBrokerTenantNames() + ); + _pinotHelixResourceManager.updateLogicalTable(logicalTableConfig); + return new SuccessResponse(logicalTableConfig.getTableName() + " logical table successfully updated."); + } catch (TableNotFoundException e) { + throw new ControllerApplicationException(LOGGER, "Failed to find logical table " + tableName, + Response.Status.NOT_FOUND, e); + } catch (IllegalArgumentException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST, e); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, + "Failed to update logical table " + tableName + ". Reason: " + e.getMessage(), + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 830fd88055..9bdfb8a254 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -176,6 +176,7 @@ import org.apache.pinot.spi.config.user.ComponentType; import org.apache.pinot.spi.config.user.RoleType; import org.apache.pinot.spi.config.user.UserConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; @@ -2205,6 +2206,64 @@ public class PinotHelixResourceManager { } } + public void addLogicalTable(LogicalTableConfig logicalTableConfig) + throws TableAlreadyExistsException { + String tableName = logicalTableConfig.getTableName(); + LOGGER.info("Adding logical table: {}", tableName); + + // Check if the logical table name is already used + LogicalTableConfig existingLogicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName); + if (existingLogicalTableConfig != null) { + throw new TableAlreadyExistsException("Logical table: " + tableName + " already exists"); + } + + // Check if the table name is already used by a physical table + getAllTables().stream().map(TableNameBuilder::extractRawTableName).distinct().filter(tableName::equals) + .findFirst().ifPresent(tableNameWithType -> { + throw new TableAlreadyExistsException("Table name: " + tableName + " already exists"); + }); + + ZKMetadataProvider.setLogicalTableConfig(_propertyStore, logicalTableConfig); + LOGGER.info("Added logical table: {}", tableName); + } + + public void updateLogicalTable(LogicalTableConfig logicalTableConfig) + throws TableNotFoundException { + String tableName = logicalTableConfig.getTableName(); + LOGGER.info("Updating logical table: {}", tableName); + + LogicalTableConfig oldLogicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName); + if (oldLogicalTableConfig == null) { + throw new TableNotFoundException("Logical table: " + tableName + " does not exist"); + } + + ZKMetadataProvider.setLogicalTableConfig(_propertyStore, logicalTableConfig); + LOGGER.info("Updated logical table: {}", tableName); + } + + public boolean deleteLogicalTable(String tableName) { + LOGGER.info("Deleting logical table: {}", tableName); + boolean result = false; + String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForLogical(tableName); + if (_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) { + result = _propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT); + } else { + throw new ControllerApplicationException(LOGGER, + "Logical table: " + tableName + " does not exists.", Response.Status.NOT_FOUND); + } + LOGGER.info("Deleted logical table: {}", tableName); + return result; + } + + public LogicalTableConfig getLogicalTableConfig(String tableName) { + return ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName); + } + + public List<String> getAllLogicalTableNames() { + return ZKMetadataProvider.getAllLogicalTableConfigs(_propertyStore).stream().map(LogicalTableConfig::getTableName) + .collect(Collectors.toList()); + } + /** * Returns the ZK metdata for the given jobId and jobType * @param jobId the id of the job diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotAdminUserLogicalTableResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotAdminUserLogicalTableResourceTest.java new file mode 100644 index 0000000000..573e2984ef --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotAdminUserLogicalTableResourceTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import java.util.Map; +import org.apache.pinot.controller.helix.ControllerRequestClient; + + +public class PinotAdminUserLogicalTableResourceTest extends PinotLogicalTableResourceTest { + + public static final String AUTH_TOKEN = "Basic YWRtaW46dmVyeXNlY3JldA====="; + public static final Map<String, String> AUTH_HEADER = Map.of("Authorization", AUTH_TOKEN); + + @Override + protected void overrideControllerConf(Map<String, Object> properties) { + properties.put("controller.admin.access.control.factory.class", + "org.apache.pinot.controller.api.access.BasicAuthAccessControlFactory"); + properties.put("controller.admin.access.control.principals", "admin"); + properties.put("controller.admin.access.control.principals.admin.password", "verysecret"); + } + + @Override + protected Map<String, String> getHeaders() { + return AUTH_HEADER; + } + + @Override + public ControllerRequestClient getControllerRequestClient() { + if (_controllerRequestClient == null) { + _controllerRequestClient = + new ControllerRequestClient(_controllerRequestURLBuilder, getHttpClient(), AUTH_HEADER); + } + return _controllerRequestClient; + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java new file mode 100644 index 0000000000..1f2d3a8204 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + + +public class PinotLogicalTableResourceTest extends ControllerTest { + + private static final String LOGICAL_TABLE_NAME = "test_logical_table"; + public static final String BROKER_TENANT = "DefaultTenant"; + protected ControllerRequestURLBuilder _controllerRequestURLBuilder; + + @BeforeClass + public void setUpClass() + throws Exception { + startZk(); + startController(); + addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); + addFakeServerInstancesToAutoJoinHelixCluster(1, true); + _controllerRequestURLBuilder = getControllerRequestURLBuilder(); + } + + @AfterClass + public void tearDownClass() { + stopController(); + stopZk(); + } + + @AfterMethod + public void tearDown() { + // cleans up the physical tables after each testcase + cleanup(); + } + + @DataProvider + public Object[][] tableNamesProvider() { + return new Object[][]{ + {"test_logical_table", List.of("test_table_1", "test_table_2"), List.of("test_table_3")}, + {"test_logical_table", List.of("test_table_1", "db.test_table_2"), List.of("test_table_3")}, + {"test_logical_table", List.of("test_table_1", "test_table_2"), List.of("db.test_table_3")}, + {"test_logical_table", List.of("db.test_table_1", "db.test_table_2"), List.of("db.test_table_3")}, + {"test_table", List.of("db1.test_table", "db2.test_table"), List.of("db3.test_table")}, + {"db0.test_table", List.of("db1.test_table", "db2.test_table"), List.of("db3.test_table")}, + {"db.test_logical_table", List.of("test_table_1", "test_table_2"), List.of("test_table_3")}, + {"db.test_logical_table", List.of("test_table_1", "db.test_table_2"), List.of("test_table_3")}, + {"db.test_logical_table", List.of("test_table_1", "test_table_2"), List.of("db.test_table_3")}, + {"db.test_logical_table", List.of("db.test_table_1", "db.test_table_2"), List.of("db.test_table_3")}, + }; + } + + @Test(dataProvider = "tableNamesProvider") + public void testCreateUpdateDeleteLogicalTables(String logicalTableName, List<String> physicalTableNames, + List<String> physicalTablesToUpdate) + throws IOException { + // verify logical table does not exist + String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); + String getLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableGet(logicalTableName); + String updateLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName); + String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName); + + // verify logical table does not exist + verifyLogicalTableDoesNotExists(getLogicalTableUrl); + + // setup physical and logical tables + List<String> physicalTableNamesWithType = createHybridTables(physicalTableNames); + LogicalTableConfig + logicalTableConfig = getDummyLogicalTableConfig(logicalTableName, physicalTableNamesWithType, BROKER_TENANT); + + // create logical table + String resp = + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + assertEquals(resp, + "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + " logical table successfully added.\"}"); + + // verify logical table + verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig); + + // update logical table and setup new physical tables + List<String> tableNameToUpdateWithType = createHybridTables(physicalTablesToUpdate); + tableNameToUpdateWithType.addAll(physicalTableNamesWithType); + logicalTableConfig = getDummyLogicalTableConfig(logicalTableName, tableNameToUpdateWithType, BROKER_TENANT); + + String response = + ControllerTest.sendPutRequest(updateLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + assertEquals(response, + "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + " logical table successfully updated.\"}"); + + // verify updated logical table + verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig); + + // delete logical table + String deleteResponse = ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders()); + assertEquals(deleteResponse, "{\"status\":\"" + logicalTableName + " logical table successfully deleted.\"}"); + + // verify logical table is deleted + verifyLogicalTableDoesNotExists(getLogicalTableUrl); + } + + @Test + public void testLogicalTableValidationTests() + throws IOException { + String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); + + // create physical tables + List<String> physicalTableNames = List.of("test_table_1", "test_table_2"); + List<String> physicalTableNamesWithType = createHybridTables(physicalTableNames); + + // Test logical table name with _OFFLINE and _REALTIME is not allowed + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig("testLogicalTable_OFFLINE", physicalTableNamesWithType, BROKER_TENANT); + try { + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + fail("Logical Table POST request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Reason: 'tableName' should not end with _OFFLINE or _REALTIME"), + e.getMessage()); + } + + logicalTableConfig = + getDummyLogicalTableConfig("testLogicalTable_REALTIME", physicalTableNamesWithType, BROKER_TENANT); + try { + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + fail("Logical Table POST request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Reason: 'tableName' should not end with _OFFLINE or _REALTIME"), + e.getMessage()); + } + + // Test logical table name can not be same as existing physical table name + logicalTableConfig = + getDummyLogicalTableConfig("test_table_1", physicalTableNamesWithType, BROKER_TENANT); + try { + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + fail("Logical Table POST request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Table name: test_table_1 already exists"), e.getMessage()); + } + + // Test empty physical table names is not allowed + logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(), BROKER_TENANT); + try { + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + fail("Logical Table POST request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("'physicalTableConfigMap' should not be null or empty"), e.getMessage()); + } + + // Test all table names are physical table names and none is hybrid table name + logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNames, BROKER_TENANT); + try { + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + fail("Logical Table POST request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Reason: 'test_table_1' should be one of the existing tables"), + e.getMessage()); + } + + // Test valid broker tenant is provided + logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, "InvalidTenant"); + try { + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + fail("Logical Table POST request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Reason: 'InvalidTenant' should be one of the existing broker tenants"), + e.getMessage()); + } + } + + @Test + public void testLogicalTableWithSameNameNotAllowed() + throws IOException { + String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); + String getLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME); + List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_1", "test_table_2")); + + LogicalTableConfig + logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig); + try { + // create the same logical table again + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + fail("Logical Table POST request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Logical table: test_logical_table already exists"), e.getMessage()); + } + + // clean up the logical table + String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME); + ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders()); + verifyLogicalTableDoesNotExists(getLogicalTableUrl); + } + + @DataProvider + public Object[][] physicalTableShouldExistProvider() { + return new Object[][]{ + {LOGICAL_TABLE_NAME, List.of("test_table_1"), "unknown_table_OFFLINE"}, + {LOGICAL_TABLE_NAME, List.of("test_table_2"), "unknown_table_REALTIME"}, + {LOGICAL_TABLE_NAME, List.of("test_table_1"), "db.test_table_1_OFFLINE"}, + {LOGICAL_TABLE_NAME, List.of("test_table_2"), "db.test_table_2_REALTIME"}, + }; + } + + @Test(dataProvider = "physicalTableShouldExistProvider") + public void testPhysicalTableShouldExist(String logicalTableName, List<String> physicalTableNames, + String unknownTableName) + throws IOException { + String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); + + // setup physical tables + List<String> physicalTableNamesWithType = createHybridTables(physicalTableNames); + physicalTableNamesWithType.add(unknownTableName); + + // Test physical table should exist + LogicalTableConfig + logicalTableConfig = getDummyLogicalTableConfig(logicalTableName, physicalTableNamesWithType, BROKER_TENANT); + try { + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + fail("Logical Table POST request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("'" + unknownTableName + "' should be one of the existing tables"), + e.getMessage()); + } + } + + @Test + public void testGetLogicalTableNames() + throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + String getLogicalTableNamesUrl = _controllerRequestURLBuilder.forLogicalTableNamesGet(); + String response = ControllerTest.sendGetRequest(getLogicalTableNamesUrl, getHeaders()); + assertEquals(response, objectMapper.writeValueAsString(List.of())); + + // setup physical tables and logical tables + List<String> logicalTableNames = List.of("db.test_logical_table_1", "test_logical_table_2", "test_logical_table_3"); + List<String> physicalTableNames = List.of("test_table_1", "test_table_2", "db.test_table_3"); + List<String> physicalTableNamesWithType = createHybridTables(physicalTableNames); + + for (int i = 0; i < logicalTableNames.size(); i++) { + LogicalTableConfig logicalTableConfig = getDummyLogicalTableConfig(logicalTableNames.get(i), List.of( + physicalTableNamesWithType.get(2 * i), physicalTableNamesWithType.get(2 * i + 1)), BROKER_TENANT); + + ControllerTest.sendPostRequest(_controllerRequestURLBuilder.forLogicalTableCreate(), + logicalTableConfig.toSingleLineJsonString(), getHeaders()); + } + + // verify logical table names + String getLogicalTableNamesResponse = ControllerTest.sendGetRequest(getLogicalTableNamesUrl, getHeaders()); + assertEquals(getLogicalTableNamesResponse, objectMapper.writeValueAsString(logicalTableNames)); + + // cleanup: delete logical tables + for (String logicalTableName : logicalTableNames) { + String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName); + String deleteResponse = ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders()); + assertEquals(deleteResponse, "{\"status\":\"" + logicalTableName + " logical table successfully deleted.\"}"); + } + } + + private void verifyLogicalTableExists(String getLogicalTableUrl, LogicalTableConfig logicalTableConfig) + throws IOException { + LogicalTableConfig remoteLogicalTableConfig = + LogicalTableConfig.fromString(ControllerTest.sendGetRequest(getLogicalTableUrl, getHeaders())); + assertEquals(remoteLogicalTableConfig, logicalTableConfig); + } + + private void verifyLogicalTableDoesNotExists(String getLogicalTableUrl) { + try { + ControllerTest.sendGetRequest(getLogicalTableUrl, getHeaders()); + fail("Logical Table GET request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Logical table not found"), e.getMessage()); + } + } + + protected Map<String, String> getHeaders() { + return Map.of(); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java new file mode 100644 index 0000000000..b8f0473267 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.controller.helix.ControllerRequestClient; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + + +public class PinotUserWithAccessLogicalTableResourceTest extends ControllerTest { + + public static final String AUTH_TOKEN = "Basic YWRtaW46dmVyeXNlY3JldA====="; + public static final String AUTH_TOKEN_USER = "Basic dXNlcjpzZWNyZXQ=="; + public static final Map<String, String> AUTH_HEADER = Map.of("Authorization", AUTH_TOKEN); + public static final Map<String, String> AUTH_HEADER_USER = Map.of("Authorization", AUTH_TOKEN_USER); + public static final String LOGICAL_TABLE_NAME = "test_logical_table"; + + private Map<String, Object> getControllerConf(Object permissions) { + Map<String, Object> properties = new HashMap<>(); + properties.put("controller.admin.access.control.factory.class", + "org.apache.pinot.controller.api.access.BasicAuthAccessControlFactory"); + properties.put("controller.admin.access.control.principals", "admin,user"); + properties.put("controller.admin.access.control.principals.admin.password", "verysecret"); + properties.put("controller.admin.access.control.principals.user.password", "secret"); + properties.put("controller.admin.access.control.principals.user.permissions", permissions); + return properties; + } + + protected Map<String, String> getHeaders() { + return AUTH_HEADER_USER; + } + + @Override + public ControllerRequestClient getControllerRequestClient() { + if (_controllerRequestClient == null) { + _controllerRequestClient = + new ControllerRequestClient(_controllerRequestURLBuilder, getHttpClient(), AUTH_HEADER); + } + return _controllerRequestClient; + } + + private void setup(Map<String, Object> properties) + throws Exception { + startZk(); + Map<String, Object> configuration = getDefaultControllerConfiguration(); + configuration.putAll(properties); + startController(configuration); + addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); + addFakeServerInstancesToAutoJoinHelixCluster(1, true); + _controllerRequestURLBuilder = getControllerRequestURLBuilder(); + } + + @AfterMethod + private void tearDown() { + cleanup(); + String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME); + try { + ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, AUTH_HEADER); + } catch (Exception e) { + // ignore + } + stopController(); + stopZk(); + } + + @DataProvider + public Object[][] permissionsProvider() { + return new Object[][]{ + {"read,create"}, + {"read,create,update"}, + {"read,create,update,delete"} + }; + } + + @Test(dataProvider = "permissionsProvider") + public void testUserWithCreateAccess(String permissions) + throws Exception { + Map<String, Object> properties = getControllerConf(permissions); + + setup(properties); + + String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); + String getLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME); + String updateLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableUpdate(LOGICAL_TABLE_NAME); + String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME); + + List<String> physicalTableNames = List.of("test_table_1"); + List<String> physicalTablesWithType = createHybridTables(physicalTableNames); + LogicalTableConfig logicalTableConfig; + + // create logical table + try { + logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTablesWithType, "DefaultTenant"); + String resp = + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + if (permissions.contains("create")) { + assertEquals(resp, + "{\"unrecognizedProperties\":{},\"status\":\"" + LOGICAL_TABLE_NAME + + " logical table successfully added.\"}"); + verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig); + } else { + fail("Logical Table POST request should have failed"); + } + } catch (Exception e) { + assertTrue(e.getMessage().contains("Permission is denied for CREATE"), e.getMessage()); + } + + // update logical table + try { + physicalTablesWithType.addAll(createHybridTables(List.of("test_table_2"))); + logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTablesWithType, "DefaultTenant"); + String respUpdate = ControllerTest.sendPutRequest( + updateLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders() + ); + if (permissions.contains("update")) { + assertEquals(respUpdate, + "{\"unrecognizedProperties\":{},\"status\":\"" + LOGICAL_TABLE_NAME + + " logical table successfully updated.\"}"); + verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig); + } else { + fail("Logical Table POST request should have failed"); + } + } catch (Exception e) { + assertTrue(e.getMessage().contains("Permission is denied for UPDATE"), e.getMessage()); + } + + // delete logical table + try { + String respDelete = ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders()); + if (permissions.contains("delete")) { + assertEquals(respDelete, "{\"status\":\"" + LOGICAL_TABLE_NAME + " logical table successfully deleted.\"}"); + } else { + fail("Logical Table DELETE request should have failed"); + } + } catch (Exception e) { + assertTrue(e.getMessage().contains("Permission is denied for DELETE"), e.getMessage()); + } + } + + private void verifyLogicalTableExists(String logicalTableNamesGet, LogicalTableConfig logicalTableConfig) + throws IOException { + String respGet = ControllerTest.sendGetRequest(logicalTableNamesGet, getHeaders()); + LogicalTableConfig remoteTable = LogicalTableConfig.fromString(respGet); + assertEquals(remoteTable, logicalTableConfig); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index a5b05031c6..effcf823b7 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -69,12 +69,15 @@ import org.apache.pinot.controller.api.access.AllowAllAccessFactory; import org.apache.pinot.controller.api.resources.PauseStatusDetails; import org.apache.pinot.controller.api.resources.TableViews; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.PhysicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; @@ -82,6 +85,8 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.NetUtils; import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; +import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.mockito.MockedStatic; @@ -159,6 +164,21 @@ public class ControllerTest { return DEFAULT_INSTANCE; } + public List<String> createHybridTables(List<String> tableNames) + throws IOException { + List<String> tableNamesWithType = new ArrayList<>(); + for (String tableName : tableNames) { + addDummySchema(tableName); + TableConfig offlineTable = createDummyTableConfig(tableName, TableType.OFFLINE); + TableConfig realtimeTable = createDummyTableConfig(tableName, TableType.REALTIME); + addTableConfig(offlineTable); + addTableConfig(realtimeTable); + tableNamesWithType.add(offlineTable.getTableName()); + tableNamesWithType.add(realtimeTable.getTableName()); + } + return tableNamesWithType; + } + public String getHelixClusterName() { return _clusterName; } @@ -369,6 +389,19 @@ public class ControllerTest { } } + public static LogicalTableConfig getDummyLogicalTableConfig(String tableName, List<String> physicalTableNames, + String brokerTenant) { + Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>(); + for (String physicalTableName : physicalTableNames) { + physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig()); + } + LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder() + .setTableName(tableName) + .setBrokerTenant(brokerTenant) + .setPhysicalTableConfigMap(physicalTableConfigMap); + return builder.build(); + } + public static class FakeBrokerResourceOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { private static final String STATE_MODEL_DEF = "BrokerResourceOnlineOfflineStateModel"; @@ -649,6 +682,19 @@ public class ControllerTest { return schema; } + public static TableConfig createDummyTableConfig(String tableName, TableType tableType) { + TableConfigBuilder builder = new TableConfigBuilder(tableType); + if (tableType == TableType.REALTIME) { + builder.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()); + } + return builder.setTableName(tableName) + .setTimeColumnName("timeColumn") + .setTimeType("DAYS") + .setRetentionTimeUnit("DAYS") + .setRetentionTimeValue("5") + .build(); + } + public static Schema createDummySchemaWithPrimaryKey(String tableName) { Schema schema = createDummySchema(tableName); schema.setPrimaryKeyColumns(Collections.singletonList("dimA")); @@ -1184,6 +1230,12 @@ public class ControllerTest { * test functionality. */ public void cleanup() { + // Delete logical tables + List<String> logicalTables = _helixResourceManager.getAllLogicalTableNames(); + for (String logicalTableName : logicalTables) { + _helixResourceManager.deleteLogicalTable(logicalTableName); + } + // Delete all tables List<String> tables = _helixResourceManager.getAllTables(); for (String tableNameWithType : tables) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java index 56fe2464cf..dd5e217417 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java @@ -22,14 +22,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener; import org.apache.pinot.spi.config.provider.SchemaChangeListener; import org.apache.pinot.spi.config.provider.TableConfigChangeListener; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.data.PhysicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn; +import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; @@ -44,10 +48,14 @@ import static org.testng.Assert.*; public class TableCacheTest { private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance(); private static final String RAW_TABLE_NAME = "cacheTestTable"; + private static final String ANOTHER_TABLE = "anotherTable"; + private static final String LOGICAL_TABLE_NAME = "cacheLogicalTestTable"; private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); + private static final String ANOTHER_TABLE_OFFLINE = TableNameBuilder.OFFLINE.tableNameWithType(ANOTHER_TABLE); private static final String MANGLED_RAW_TABLE_NAME = "cAcHeTeStTaBlE"; + private static final String MANGLED_LOGICAL_TABLE_NAME = "cAcHeLoGiCaLTeStTaBlE"; private static final String MANGLED_OFFLINE_TABLE_NAME = MANGLED_RAW_TABLE_NAME + "_oFfLiNe"; @BeforeClass @@ -65,6 +73,7 @@ public class TableCacheTest { assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME)); assertNull(tableCache.getActualTableName(RAW_TABLE_NAME)); + assertNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME)); // Add a schema Schema schema = @@ -107,18 +116,43 @@ public class TableCacheTest { assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema); assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); + // Add logical table + LogicalTableConfig logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME)); + TEST_INSTANCE.getHelixResourceManager().addLogicalTable(logicalTableConfig); + // Wait for at most 10 seconds for the callback to add the logical table to the cache + TestUtils.waitForCondition(aVoid -> tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME) != null, 10_000L, + "Failed to add the logical table to the cache"); + // Logical table can be accessed by the logical table name + if (isCaseInsensitive) { + assertEquals(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME), LOGICAL_TABLE_NAME); + assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), logicalTableConfig); + assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), expectedSchema); + } else { + assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME)); + } + assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), logicalTableConfig); + assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema); + // Register the change listeners TestTableConfigChangeListener tableConfigChangeListener = new TestTableConfigChangeListener(); assertTrue(tableCache.registerTableConfigChangeListener(tableConfigChangeListener)); assertEquals(tableConfigChangeListener._tableConfigList.size(), 1); assertEquals(tableConfigChangeListener._tableConfigList.get(0), tableConfig); + TestSchemaChangeListener schemaChangeListener = new TestSchemaChangeListener(); assertTrue(tableCache.registerSchemaChangeListener(schemaChangeListener)); assertEquals(schemaChangeListener._schemaList.size(), 1); assertEquals(schemaChangeListener._schemaList.get(0), expectedSchema); + + TestLogicalTableConfigChangeListener logicalTableConfigChangeListener = new TestLogicalTableConfigChangeListener(); + assertTrue(tableCache.registerLogicalTableConfigChangeListener(logicalTableConfigChangeListener)); + assertEquals(logicalTableConfigChangeListener._logicalTableConfigList.size(), 1); + assertEquals(logicalTableConfigChangeListener._logicalTableConfigList.get(0), logicalTableConfig); + // Re-register the change listener should fail assertFalse(tableCache.registerTableConfigChangeListener(tableConfigChangeListener)); assertFalse(tableCache.registerSchemaChangeListener(schemaChangeListener)); + assertFalse(tableCache.registerLogicalTableConfigChangeListener(logicalTableConfigChangeListener)); // Update the schema schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true)); @@ -174,8 +208,34 @@ public class TableCacheTest { // waitForEVToDisappear() call TEST_INSTANCE.waitForEVToAppear(OFFLINE_TABLE_NAME); + // Update logical table config (create schema and table config for anotherTable) + Schema anotherTableSchema = + new Schema.SchemaBuilder().setSchemaName(ANOTHER_TABLE).addSingleValueDimension("testColumn", DataType.INT) + .build(); + TableConfig anotherTableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(ANOTHER_TABLE_OFFLINE).build(); + TEST_INSTANCE.getHelixResourceManager().addSchema(anotherTableSchema, false, false); + TEST_INSTANCE.getHelixResourceManager().addTable(anotherTableConfig); + TEST_INSTANCE.waitForEVToAppear(ANOTHER_TABLE_OFFLINE); + // Wait for at most 10 seconds for the callback to add the table config to the cache + TestUtils.waitForCondition( + aVoid -> anotherTableConfig.equals(tableCache.getTableConfig(ANOTHER_TABLE_OFFLINE)), 10_000L, + "Failed to add the table config to the cache"); + // update the logical table + logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE)); + TEST_INSTANCE.getHelixResourceManager().updateLogicalTable(logicalTableConfig); + if (isCaseInsensitive) { + assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), logicalTableConfig); + assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), expectedSchema); + } else { + assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME)); + } + assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), logicalTableConfig); + assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema); + // Remove the table config TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME); + TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(ANOTHER_TABLE_OFFLINE); // Wait for at most 10 seconds for the callback to remove the table config from the cache // NOTE: // - Verify if the callback is fully done by checking the table config change lister because it is the last step of @@ -183,27 +243,55 @@ public class TableCacheTest { TestUtils.waitForCondition(aVoid -> tableConfigChangeListener._tableConfigList.isEmpty(), 10_000L, "Failed to remove the table config from the cache"); assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME)); + assertNull(tableCache.getTableConfig(ANOTHER_TABLE_OFFLINE)); assertNull(tableCache.getActualTableName(RAW_TABLE_NAME)); assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema); assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); // Remove the schema TEST_INSTANCE.getHelixResourceManager().deleteSchema(RAW_TABLE_NAME); + TEST_INSTANCE.getHelixResourceManager().deleteSchema(ANOTHER_TABLE); // Wait for at most 10 seconds for the callback to remove the schema from the cache // NOTE: // - Verify if the callback is fully done by checking the schema change lister because it is the last step of the // callback handling TestUtils.waitForCondition(aVoid -> schemaChangeListener._schemaList.isEmpty(), 10_000L, "Failed to remove the schema from the cache"); + + // Remove logical table + TEST_INSTANCE.getHelixResourceManager().deleteLogicalTable(LOGICAL_TABLE_NAME); + // Wait for at most 10 seconds for the callback to remove the logical table from the cache + // NOTE: + // - Verify if the callback is fully done by checking the logical table change lister because it is the last step of + // the callback handling + TestUtils.waitForCondition(aVoid -> logicalTableConfigChangeListener._logicalTableConfigList.isEmpty(), 10_000L, + "Failed to remove the logical table from the cache"); + assertNull(tableCache.getSchema(RAW_TABLE_NAME)); + assertNull(tableCache.getSchema(ANOTHER_TABLE)); assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); assertNull(tableCache.getSchema(RAW_TABLE_NAME)); assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); + assertNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME)); assertEquals(schemaChangeListener._schemaList.size(), 0); assertEquals(tableConfigChangeListener._tableConfigList.size(), 0); + assertEquals(logicalTableConfigChangeListener._logicalTableConfigList.size(), 0); // Wait for external view to disappear to ensure a clean start for the next test TEST_INSTANCE.waitForEVToDisappear(OFFLINE_TABLE_NAME); + TEST_INSTANCE.waitForEVToDisappear(ANOTHER_TABLE_OFFLINE); + } + + private static LogicalTableConfig getLogicalTableConfig(String tableName, List<String> physicalTableNames) { + Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>(); + for (String physicalTableName : physicalTableNames) { + physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig()); + } + LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder() + .setTableName(tableName) + .setBrokerTenant("DefaultTenant") + .setPhysicalTableConfigMap(physicalTableConfigMap); + return builder.build(); } @DataProvider(name = "testTableCacheDataProvider") @@ -229,6 +317,15 @@ public class TableCacheTest { } } + private static class TestLogicalTableConfigChangeListener implements LogicalTableConfigChangeListener { + private volatile List<LogicalTableConfig> _logicalTableConfigList; + + @Override + public void onChange(List<LogicalTableConfig> logicalTableConfigList) { + _logicalTableConfigList = logicalTableConfigList; + } + } + @AfterClass public void tearDown() { TEST_INSTANCE.cleanup(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/LogicalTableConfigChangeListener.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/LogicalTableConfigChangeListener.java new file mode 100644 index 0000000000..adf4b990db --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/LogicalTableConfigChangeListener.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.provider; + +import java.util.List; +import org.apache.pinot.spi.data.LogicalTableConfig; + + +public interface LogicalTableConfigChangeListener { + /** + * The callback to be invoked on logical table changes + * @param logicalTableConfigList the entire list of logical tables in the cluster + */ + void onChange(List<LogicalTableConfig> logicalTableConfigList); +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java index 0400fe0498..64de5ada56 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java @@ -20,6 +20,7 @@ package org.apache.pinot.spi.config.provider; import java.util.List; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; @@ -57,4 +58,19 @@ public interface PinotConfigProvider { * registered. */ boolean registerSchemaChangeListener(SchemaChangeListener schemaChangeListener); + + /** + * Returns the logical table config for the given logical table name. + * @param logicalTableName the name of the logical table + * @return the logical table + */ + LogicalTableConfig getLogicalTableConfig(String logicalTableName); + + /** + * Registers the {@link LogicalTableConfigChangeListener} and notifies it whenever any changes (addition, update, + * @param logicalTableConfigChangeListener the listener to be registered + * @return {@code true} if the listener is successfully registered, {@code false} if the listener is already + * registered. + */ + boolean registerLogicalTableConfigChangeListener(LogicalTableConfigChangeListener logicalTableConfigChangeListener); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java new file mode 100644 index 0000000000..4d477691d1 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.data; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import org.apache.pinot.spi.config.BaseJsonConfig; +import org.apache.pinot.spi.utils.JsonUtils; + + +@JsonIgnoreProperties(ignoreUnknown = true) +public class LogicalTableConfig extends BaseJsonConfig { + + private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper(); + + public static final String LOGICAL_TABLE_NAME_KEY = "tableName"; + public static final String PHYSICAL_TABLE_CONFIG_KEY = "physicalTableConfigMap"; + public static final String BROKER_TENANT_KEY = "brokerTenant"; + + private String _tableName; + private String _brokerTenant; + private Map<String, PhysicalTableConfig> _physicalTableConfigMap; + + public static LogicalTableConfig fromString(String logicalTableString) + throws IOException { + return JsonUtils.stringToObject(logicalTableString, LogicalTableConfig.class); + } + + public String getTableName() { + return _tableName; + } + + public void setTableName(String tableName) { + _tableName = tableName; + } + + public Map<String, PhysicalTableConfig> getPhysicalTableConfigMap() { + return _physicalTableConfigMap; + } + + public void setPhysicalTableConfigMap( + Map<String, PhysicalTableConfig> physicalTableConfigMap) { + _physicalTableConfigMap = physicalTableConfigMap; + } + + public String getBrokerTenant() { + return _brokerTenant; + } + + public void setBrokerTenant(String brokerTenant) { + _brokerTenant = brokerTenant; + } + + private JsonNode toJsonObject() { + return DEFAULT_MAPPER.valueToTree(this); + } + + /** + * Returns a single-line json string representation of the schema. + */ + public String toSingleLineJsonString() { + return toJsonObject().toString(); + } + + /** + * Returns a pretty json string representation of the schema. + */ + public String toPrettyJsonString() { + try { + return JsonUtils.objectToPrettyString(toJsonObject()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + LogicalTableConfig that = (LogicalTableConfig) object; + return Objects.equals(getTableName(), that.getTableName()); + } + + @Override + public int hashCode() { + return Objects.hash(getTableName()); + } + + @Override + public String toString() { + return toSingleLineJsonString(); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java new file mode 100644 index 0000000000..c86fcf97dc --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.data; + +import org.apache.pinot.spi.config.BaseJsonConfig; + + +/** + * This class represents the configuration for a physical table in {@link LogicalTableConfig}. + * This is empty by design and more docs would be added as features are added. + */ +public class PhysicalTableConfig extends BaseJsonConfig { +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 2804eac53e..eb1b7d3e17 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -633,4 +633,24 @@ public class ControllerRequestURLBuilder { public String forIdealState(String tableName) { return StringUtil.join("/", _baseUrl, "tables", tableName, "idealstate"); } + + public String forLogicalTableCreate() { + return StringUtil.join("/", _baseUrl, "logicalTables"); + } + + public String forLogicalTableUpdate(String logicalTableName) { + return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName); + } + + public String forLogicalTableGet(String logicalTableName) { + return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName); + } + + public String forLogicalTableNamesGet() { + return StringUtil.join("/", _baseUrl, "logicalTables"); + } + + public String forLogicalTableDelete(String logicalTableName) { + return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java new file mode 100644 index 0000000000..eff47c5af6 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.utils.builder; + +import java.util.Map; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.data.PhysicalTableConfig; + + +public class LogicalTableConfigBuilder { + private String _tableName; + private Map<String, PhysicalTableConfig> _physicalTableConfigMap; + private String _brokerTenant; + + public LogicalTableConfigBuilder setTableName(String tableName) { + _tableName = tableName; + return this; + } + + public LogicalTableConfigBuilder setPhysicalTableConfigMap(Map<String, PhysicalTableConfig> physicalTableConfigMap) { + _physicalTableConfigMap = physicalTableConfigMap; + return this; + } + + public LogicalTableConfigBuilder setBrokerTenant(String brokerTenant) { + _brokerTenant = brokerTenant; + return this; + } + + public LogicalTableConfig build() { + LogicalTableConfig logicalTableConfig = new LogicalTableConfig(); + logicalTableConfig.setTableName(_tableName); + logicalTableConfig.setPhysicalTableConfigMap(_physicalTableConfigMap); + logicalTableConfig.setBrokerTenant(_brokerTenant); + return logicalTableConfig; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org