Jackie-Jiang commented on a change in pull request #6840: URL: https://github.com/apache/incubator-pinot/pull/6840#discussion_r620748224
########## File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/TableConfigs.java ########## @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; + + +/** + * Wrapper for all configs of a table, which include the offline table config, realtime table config and schema. + * This helps look at and operate on the pinot table configs as a whole unit. + */ +public class TableConfigs extends BaseJsonConfig { + private final String _tableName; + private final Schema _schema; + private final TableConfig _offline; + private final TableConfig _realtime; + + @JsonCreator + public TableConfigs(@JsonProperty(value = "tableName", required = true) String tableName, + @JsonProperty(value = "schema", required = true) Schema schema, + @JsonProperty(value = "offline") @Nullable TableConfig offline, + @JsonProperty(value = "realtime") @Nullable TableConfig realtime) { + _tableName = tableName; + _offline = offline; + _realtime = realtime; + _schema = schema; Review comment: (ocd) move this after `_tableName` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java ########## @@ -425,12 +414,14 @@ public String checkTableConfig(String tableConfigStr) { return validateConfig(tableConfig, _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig)); } + @Deprecated @POST @Path("/tables/validateTableAndSchema") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) @ApiOperation(value = "Validate table config for a table along with specified schema", notes = - "Validate given table config and schema. If specified schema is null, attempt to retrieve schema using the " + "Deprecated. Use /configs/validate instead." Review comment: Update the api path ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java ########## @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.base.Preconditions; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.access.AccessControlFactory; +import org.apache.pinot.controller.api.access.AccessControlUtils; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.segment.local.utils.SchemaUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.spi.config.TableConfigs; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.glassfish.grizzly.http.server.Request; +import org.slf4j.LoggerFactory; + + +/** + * Endpoints for CRUD of {@link TableConfigs}. + * {@link TableConfigs} is a group of the offline table config, realtime table config and schema for the same tableName. + */ +@Api(tags = Constants.TABLE_TAG) +@Path("/") +public class TableConfigsRestletResource { + + public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableConfigsRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + ControllerConf _controllerConf; + + @Inject + ControllerMetrics _controllerMetrics; + + @Inject + AccessControlFactory _accessControlFactory; + AccessControlUtils _accessControlUtils = new AccessControlUtils(); + + /** + * List all {@link TableConfigs}, where each is a group of the offline table config, realtime table config and schema for the same tableName. + * This is equivalent to a list of all raw table names + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Lists all TableConfigs in cluster", notes = "Lists all TableConfigs in cluster") + public String listConfigs() { + try { + List<String> rawTableNames = _pinotHelixResourceManager.getAllRawTables(); + Collections.sort(rawTableNames); + + ArrayNode configsList = JsonUtils.newArrayNode(); + for (String rawTableName : rawTableNames) { + configsList.add(rawTableName); + } + return configsList.toString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Gets the {@link TableConfigs} for the provided raw tableName, by fetching the offline table config for tableName_OFFLINE, + * realtime table config for tableName_REALTIME and schema for tableName + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get the TableConfigs for a given raw tableName", notes = "Get the TableConfigs for a given raw tableName") + public String getConfig( + @ApiParam(value = "Raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(tableName); + TableConfig realtimeTableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableName); + TableConfigs config = new TableConfigs(tableName, schema, offlineTableConfig, realtimeTableConfig); + return config.toPrettyJsonString(); Review comment: We don't usually return pretty string for rest API, use `toJsonString()` instead ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java ########## @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.base.Preconditions; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.access.AccessControlFactory; +import org.apache.pinot.controller.api.access.AccessControlUtils; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.segment.local.utils.SchemaUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.spi.config.TableConfigs; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.glassfish.grizzly.http.server.Request; +import org.slf4j.LoggerFactory; + + +/** + * Endpoints for CRUD of {@link TableConfigs}. + * {@link TableConfigs} is a group of the offline table config, realtime table config and schema for the same tableName. + */ +@Api(tags = Constants.TABLE_TAG) +@Path("/") +public class TableConfigsRestletResource { + + public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableConfigsRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + ControllerConf _controllerConf; + + @Inject + ControllerMetrics _controllerMetrics; + + @Inject + AccessControlFactory _accessControlFactory; + AccessControlUtils _accessControlUtils = new AccessControlUtils(); + + /** + * List all {@link TableConfigs}, where each is a group of the offline table config, realtime table config and schema for the same tableName. + * This is equivalent to a list of all raw table names + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Lists all TableConfigs in cluster", notes = "Lists all TableConfigs in cluster") + public String listConfigs() { + try { + List<String> rawTableNames = _pinotHelixResourceManager.getAllRawTables(); + Collections.sort(rawTableNames); + + ArrayNode configsList = JsonUtils.newArrayNode(); + for (String rawTableName : rawTableNames) { + configsList.add(rawTableName); + } + return configsList.toString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Gets the {@link TableConfigs} for the provided raw tableName, by fetching the offline table config for tableName_OFFLINE, + * realtime table config for tableName_REALTIME and schema for tableName + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get the TableConfigs for a given raw tableName", notes = "Get the TableConfigs for a given raw tableName") + public String getConfig( + @ApiParam(value = "Raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(tableName); + TableConfig realtimeTableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableName); + TableConfigs config = new TableConfigs(tableName, schema, offlineTableConfig, realtimeTableConfig); + return config.toPrettyJsonString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Creates a {@link TableConfigs} using the <code>tableConfigsStr</code>, by creating the schema, + * followed by the realtime tableConfig and offline tableConfig as applicable, from the {@link TableConfigs}. + * Validates the configs before applying. + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @ApiOperation(value = "Add the TableConfigs using the tableConfigsStr json", notes = "Add the TableConfigs using the tableConfigsStr json") + public SuccessResponse addConfig(String tableConfigsStr, @Context HttpHeaders httpHeaders, @Context Request request) { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs. %s", e.getMessage()), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { + String endpointUrl = request.getRequestURL().toString(); + validatePermissions(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl); Review comment: (nit) No need to wrap this one line call into a function, `AccessControl` can be reused here ```suggestion AccessControl accessControl = _accessControlFactory.create(); _accessControlUtils .validatePermission(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl, accessControl); ... ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java ########## @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.base.Preconditions; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.access.AccessControlFactory; +import org.apache.pinot.controller.api.access.AccessControlUtils; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.segment.local.utils.SchemaUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.spi.config.TableConfigs; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.glassfish.grizzly.http.server.Request; +import org.slf4j.LoggerFactory; + + +/** + * Endpoints for CRUD of {@link TableConfigs}. + * {@link TableConfigs} is a group of the offline table config, realtime table config and schema for the same tableName. + */ +@Api(tags = Constants.TABLE_TAG) +@Path("/") +public class TableConfigsRestletResource { + + public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableConfigsRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + ControllerConf _controllerConf; + + @Inject + ControllerMetrics _controllerMetrics; + + @Inject + AccessControlFactory _accessControlFactory; + AccessControlUtils _accessControlUtils = new AccessControlUtils(); + + /** + * List all {@link TableConfigs}, where each is a group of the offline table config, realtime table config and schema for the same tableName. + * This is equivalent to a list of all raw table names + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Lists all TableConfigs in cluster", notes = "Lists all TableConfigs in cluster") + public String listConfigs() { + try { + List<String> rawTableNames = _pinotHelixResourceManager.getAllRawTables(); + Collections.sort(rawTableNames); + + ArrayNode configsList = JsonUtils.newArrayNode(); + for (String rawTableName : rawTableNames) { + configsList.add(rawTableName); + } + return configsList.toString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Gets the {@link TableConfigs} for the provided raw tableName, by fetching the offline table config for tableName_OFFLINE, + * realtime table config for tableName_REALTIME and schema for tableName + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get the TableConfigs for a given raw tableName", notes = "Get the TableConfigs for a given raw tableName") + public String getConfig( + @ApiParam(value = "Raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(tableName); + TableConfig realtimeTableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableName); + TableConfigs config = new TableConfigs(tableName, schema, offlineTableConfig, realtimeTableConfig); + return config.toPrettyJsonString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Creates a {@link TableConfigs} using the <code>tableConfigsStr</code>, by creating the schema, + * followed by the realtime tableConfig and offline tableConfig as applicable, from the {@link TableConfigs}. + * Validates the configs before applying. + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @ApiOperation(value = "Add the TableConfigs using the tableConfigsStr json", notes = "Add the TableConfigs using the tableConfigsStr json") + public SuccessResponse addConfig(String tableConfigsStr, @Context HttpHeaders httpHeaders, @Context Request request) { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs. %s", e.getMessage()), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { + String endpointUrl = request.getRequestURL().toString(); + validatePermissions(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl); + + if (offlineTableConfig != null) { + tuneConfig(offlineTableConfig, schema); + validatePermissions(offlineTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + if (realtimeTableConfig != null) { + tuneConfig(realtimeTableConfig, schema); + validatePermissions(realtimeTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + + _pinotHelixResourceManager.addSchema(schema, true); + LOGGER.info("Added schema: {}", schema.getSchemaName()); + if (offlineTableConfig != null) { + _pinotHelixResourceManager.addTable(offlineTableConfig); + LOGGER.info("Added offline table config: {}", offlineTableConfig.getTableName()); + } + if (realtimeTableConfig != null) { + _pinotHelixResourceManager.addTable(realtimeTableConfig); + LOGGER.info("Added realtime table config: {}", realtimeTableConfig.getTableName()); + } + + return new SuccessResponse("TableConfigs " + tableConfigs.getTableName() + " successfully added"); + } catch (Exception e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_ADD_ERROR, 1L); + if (e instanceof PinotHelixResourceManager.InvalidTableConfigException) { + throw new ControllerApplicationException(LOGGER, + String.format("Invalid TableConfigs: %s", tableConfigs.getTableName()), Response.Status.BAD_REQUEST, e); + } else if (e instanceof PinotHelixResourceManager.TableAlreadyExistsException) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT, e); + } else { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + } + + /** + * Deletes the {@link TableConfigs} by deleting the schema tableName, the offline table config for tableName_OFFLINE and + * the realtime table config for tableName_REALTIME + */ + @DELETE + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.DELETE) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Delete the TableConfigs", notes = "Delete the TableConfigs") + public SuccessResponse deleteConfig( + @ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName") String tableName) { + + try { Review comment: Check table existence before deletion ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java ########## @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.base.Preconditions; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.access.AccessControlFactory; +import org.apache.pinot.controller.api.access.AccessControlUtils; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.segment.local.utils.SchemaUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.spi.config.TableConfigs; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.glassfish.grizzly.http.server.Request; +import org.slf4j.LoggerFactory; + + +/** + * Endpoints for CRUD of {@link TableConfigs}. + * {@link TableConfigs} is a group of the offline table config, realtime table config and schema for the same tableName. + */ +@Api(tags = Constants.TABLE_TAG) +@Path("/") +public class TableConfigsRestletResource { + + public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableConfigsRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + ControllerConf _controllerConf; + + @Inject + ControllerMetrics _controllerMetrics; + + @Inject + AccessControlFactory _accessControlFactory; + AccessControlUtils _accessControlUtils = new AccessControlUtils(); + + /** + * List all {@link TableConfigs}, where each is a group of the offline table config, realtime table config and schema for the same tableName. + * This is equivalent to a list of all raw table names + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Lists all TableConfigs in cluster", notes = "Lists all TableConfigs in cluster") + public String listConfigs() { + try { + List<String> rawTableNames = _pinotHelixResourceManager.getAllRawTables(); + Collections.sort(rawTableNames); + + ArrayNode configsList = JsonUtils.newArrayNode(); + for (String rawTableName : rawTableNames) { + configsList.add(rawTableName); + } + return configsList.toString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Gets the {@link TableConfigs} for the provided raw tableName, by fetching the offline table config for tableName_OFFLINE, + * realtime table config for tableName_REALTIME and schema for tableName + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get the TableConfigs for a given raw tableName", notes = "Get the TableConfigs for a given raw tableName") + public String getConfig( + @ApiParam(value = "Raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(tableName); + TableConfig realtimeTableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableName); + TableConfigs config = new TableConfigs(tableName, schema, offlineTableConfig, realtimeTableConfig); + return config.toPrettyJsonString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Creates a {@link TableConfigs} using the <code>tableConfigsStr</code>, by creating the schema, + * followed by the realtime tableConfig and offline tableConfig as applicable, from the {@link TableConfigs}. + * Validates the configs before applying. + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @ApiOperation(value = "Add the TableConfigs using the tableConfigsStr json", notes = "Add the TableConfigs using the tableConfigsStr json") + public SuccessResponse addConfig(String tableConfigsStr, @Context HttpHeaders httpHeaders, @Context Request request) { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs. %s", e.getMessage()), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { + String endpointUrl = request.getRequestURL().toString(); + validatePermissions(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl); + + if (offlineTableConfig != null) { + tuneConfig(offlineTableConfig, schema); + validatePermissions(offlineTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + if (realtimeTableConfig != null) { + tuneConfig(realtimeTableConfig, schema); + validatePermissions(realtimeTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + + _pinotHelixResourceManager.addSchema(schema, true); + LOGGER.info("Added schema: {}", schema.getSchemaName()); + if (offlineTableConfig != null) { + _pinotHelixResourceManager.addTable(offlineTableConfig); + LOGGER.info("Added offline table config: {}", offlineTableConfig.getTableName()); + } + if (realtimeTableConfig != null) { + _pinotHelixResourceManager.addTable(realtimeTableConfig); + LOGGER.info("Added realtime table config: {}", realtimeTableConfig.getTableName()); + } + + return new SuccessResponse("TableConfigs " + tableConfigs.getTableName() + " successfully added"); + } catch (Exception e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_ADD_ERROR, 1L); + if (e instanceof PinotHelixResourceManager.InvalidTableConfigException) { + throw new ControllerApplicationException(LOGGER, + String.format("Invalid TableConfigs: %s", tableConfigs.getTableName()), Response.Status.BAD_REQUEST, e); + } else if (e instanceof PinotHelixResourceManager.TableAlreadyExistsException) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT, e); + } else { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + } + + /** + * Deletes the {@link TableConfigs} by deleting the schema tableName, the offline table config for tableName_OFFLINE and + * the realtime table config for tableName_REALTIME + */ + @DELETE + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.DELETE) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Delete the TableConfigs", notes = "Delete the TableConfigs") + public SuccessResponse deleteConfig( + @ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + _pinotHelixResourceManager.deleteRealtimeTable(tableName); + LOGGER.info("Deleted realtime table: {}", tableName); + _pinotHelixResourceManager.deleteOfflineTable(tableName); + LOGGER.info("Deleted offline table: {}", tableName); + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + if (schema != null) { + _pinotHelixResourceManager.deleteSchema(schema); + } + LOGGER.info("Deleted schema: {}", tableName); + return new SuccessResponse("Deleted TableConfigs: " + tableName); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Updated the {@link TableConfigs} by updating the schema tableName, + * then updating the offline tableConfig or creating a new one if it doesn't already exist in the cluster, + * then updating the realtime tableConfig or creating a new one if it doesn't already exist in the cluster. + */ + @PUT + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.UPDATE) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Update the TableConfigs provided by the tableConfigsStr json", notes = "Update the TableConfigs provided by the tableConfigsStr json") + public SuccessResponse updateConfig( + @ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "Reload the table if the new schema is backward compatible") @DefaultValue("false") @QueryParam("reload") boolean reload, + String tableConfigsStr) + throws Exception { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + Preconditions.checkState(tableConfigs.getTableName().equals(tableName), + "'tableName' in TableConfigs: %s must match provided tableName: %s", tableConfigs.getTableName(), tableName); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs: %s", tableName), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { Review comment: Check table existence first ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java ########## @@ -165,9 +146,13 @@ public SuccessResponse addTable(String tableConfigStr, @Context HttpHeaders http throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST, e); } try { - ensureMinReplicas(tableConfig); - ensureStorageQuotaConstraints(tableConfig); - verifyTableConfigs(tableConfig); + try { + TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas()); + TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize()); + checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableName), tableConfig); + } catch (Exception e) { + throw new PinotHelixResourceManager.InvalidTableConfigException(e); Review comment: +1 ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java ########## @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.base.Preconditions; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.access.AccessControlFactory; +import org.apache.pinot.controller.api.access.AccessControlUtils; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.segment.local.utils.SchemaUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.spi.config.TableConfigs; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.glassfish.grizzly.http.server.Request; +import org.slf4j.LoggerFactory; + + +/** + * Endpoints for CRUD of {@link TableConfigs}. + * {@link TableConfigs} is a group of the offline table config, realtime table config and schema for the same tableName. + */ +@Api(tags = Constants.TABLE_TAG) +@Path("/") +public class TableConfigsRestletResource { + + public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableConfigsRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + ControllerConf _controllerConf; + + @Inject + ControllerMetrics _controllerMetrics; + + @Inject + AccessControlFactory _accessControlFactory; + AccessControlUtils _accessControlUtils = new AccessControlUtils(); + + /** + * List all {@link TableConfigs}, where each is a group of the offline table config, realtime table config and schema for the same tableName. + * This is equivalent to a list of all raw table names + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Lists all TableConfigs in cluster", notes = "Lists all TableConfigs in cluster") + public String listConfigs() { + try { + List<String> rawTableNames = _pinotHelixResourceManager.getAllRawTables(); + Collections.sort(rawTableNames); + + ArrayNode configsList = JsonUtils.newArrayNode(); + for (String rawTableName : rawTableNames) { + configsList.add(rawTableName); + } + return configsList.toString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Gets the {@link TableConfigs} for the provided raw tableName, by fetching the offline table config for tableName_OFFLINE, + * realtime table config for tableName_REALTIME and schema for tableName + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get the TableConfigs for a given raw tableName", notes = "Get the TableConfigs for a given raw tableName") + public String getConfig( + @ApiParam(value = "Raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(tableName); + TableConfig realtimeTableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableName); + TableConfigs config = new TableConfigs(tableName, schema, offlineTableConfig, realtimeTableConfig); + return config.toPrettyJsonString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Creates a {@link TableConfigs} using the <code>tableConfigsStr</code>, by creating the schema, + * followed by the realtime tableConfig and offline tableConfig as applicable, from the {@link TableConfigs}. + * Validates the configs before applying. + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @ApiOperation(value = "Add the TableConfigs using the tableConfigsStr json", notes = "Add the TableConfigs using the tableConfigsStr json") + public SuccessResponse addConfig(String tableConfigsStr, @Context HttpHeaders httpHeaders, @Context Request request) { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs. %s", e.getMessage()), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { + String endpointUrl = request.getRequestURL().toString(); + validatePermissions(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl); + + if (offlineTableConfig != null) { + tuneConfig(offlineTableConfig, schema); + validatePermissions(offlineTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + if (realtimeTableConfig != null) { + tuneConfig(realtimeTableConfig, schema); + validatePermissions(realtimeTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + + _pinotHelixResourceManager.addSchema(schema, true); + LOGGER.info("Added schema: {}", schema.getSchemaName()); + if (offlineTableConfig != null) { + _pinotHelixResourceManager.addTable(offlineTableConfig); + LOGGER.info("Added offline table config: {}", offlineTableConfig.getTableName()); + } + if (realtimeTableConfig != null) { + _pinotHelixResourceManager.addTable(realtimeTableConfig); + LOGGER.info("Added realtime table config: {}", realtimeTableConfig.getTableName()); + } + + return new SuccessResponse("TableConfigs " + tableConfigs.getTableName() + " successfully added"); + } catch (Exception e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_ADD_ERROR, 1L); + if (e instanceof PinotHelixResourceManager.InvalidTableConfigException) { + throw new ControllerApplicationException(LOGGER, + String.format("Invalid TableConfigs: %s", tableConfigs.getTableName()), Response.Status.BAD_REQUEST, e); + } else if (e instanceof PinotHelixResourceManager.TableAlreadyExistsException) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT, e); + } else { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + } + + /** + * Deletes the {@link TableConfigs} by deleting the schema tableName, the offline table config for tableName_OFFLINE and + * the realtime table config for tableName_REALTIME + */ + @DELETE + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.DELETE) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Delete the TableConfigs", notes = "Delete the TableConfigs") + public SuccessResponse deleteConfig( + @ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + _pinotHelixResourceManager.deleteRealtimeTable(tableName); + LOGGER.info("Deleted realtime table: {}", tableName); + _pinotHelixResourceManager.deleteOfflineTable(tableName); + LOGGER.info("Deleted offline table: {}", tableName); + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + if (schema != null) { + _pinotHelixResourceManager.deleteSchema(schema); + } + LOGGER.info("Deleted schema: {}", tableName); + return new SuccessResponse("Deleted TableConfigs: " + tableName); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Updated the {@link TableConfigs} by updating the schema tableName, + * then updating the offline tableConfig or creating a new one if it doesn't already exist in the cluster, + * then updating the realtime tableConfig or creating a new one if it doesn't already exist in the cluster. + */ + @PUT + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.UPDATE) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Update the TableConfigs provided by the tableConfigsStr json", notes = "Update the TableConfigs provided by the tableConfigsStr json") + public SuccessResponse updateConfig( + @ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "Reload the table if the new schema is backward compatible") @DefaultValue("false") @QueryParam("reload") boolean reload, + String tableConfigsStr) + throws Exception { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + Preconditions.checkState(tableConfigs.getTableName().equals(tableName), + "'tableName' in TableConfigs: %s must match provided tableName: %s", tableConfigs.getTableName(), tableName); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs: %s", tableName), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { + _pinotHelixResourceManager.updateSchema(schema, reload); + LOGGER.info("Updated schema: {}", tableName); + + if (offlineTableConfig != null) { + tuneConfig(offlineTableConfig, schema); + if (_pinotHelixResourceManager.hasOfflineTable(tableName)) { + _pinotHelixResourceManager.updateTableConfig(offlineTableConfig); + LOGGER.info("Updated offline table config: {}", tableName); + } else { + _pinotHelixResourceManager.addTable(offlineTableConfig); + LOGGER.info("Created offline table config: {}", tableName); + } + if (realtimeTableConfig != null) { + tuneConfig(realtimeTableConfig, schema); + if (_pinotHelixResourceManager.hasRealtimeTable(tableName)) { + _pinotHelixResourceManager.updateTableConfig(realtimeTableConfig); + LOGGER.info("Updated realtime table config: {}", tableName); + } else { + _pinotHelixResourceManager.addTable(realtimeTableConfig); + LOGGER.info("Created realtime table config: {}", tableName); + } + } + } + } catch (PinotHelixResourceManager.InvalidTableConfigException e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR, 1L); + throw new ControllerApplicationException(LOGGER, + String.format("Invalid TableConfigs for: %s, %s", tableName, e.getMessage()), Response.Status.BAD_REQUEST, e); + } catch (Exception e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR, 1L); + throw new ControllerApplicationException(LOGGER, + String.format("Failed to update TableConfigs for: %s, %s", tableName, e.getMessage()), + Response.Status.INTERNAL_SERVER_ERROR, e); + } + + return new SuccessResponse("TableConfigs updated for " + tableName); + } + + /** + * Validates the {@link TableConfigs} as provided in the tableConfigsStr json, by validating the schema, + * the realtime table config and the offline table config + */ + @POST + @Path("/tableConfigs/validate") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Validate the TableConfigs", notes = "Validate the TableConfigs") + public String validateConfig(String tableConfigsStr) { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + } catch (IOException e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs json string: %s", tableConfigsStr), + Response.Status.BAD_REQUEST, e); + } + return validateConfig(tableConfigs); + } + + private void tuneConfig(TableConfig tableConfig, Schema schema) { + TableConfigUtils.applyTunerConfig(tableConfig, schema); + TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas()); + TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize()); + } + + private String validateConfig(TableConfigs tableConfigs) { + String tableName = tableConfigs.getTableName(); + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + try { + Preconditions.checkState(offlineTableConfig != null || realtimeTableConfig != null, + "Must provide at least one of 'realtime' or 'offline' table configs for adding TableConfigs: %s", tableName); + Preconditions.checkState(schema != null, "Must provide 'schema' for adding TableConfigs: %s", tableName); + Preconditions.checkState(!tableName.isEmpty(), "'tableName' cannot be empty in TableConfigs"); + + Preconditions + .checkState(tableName.equals(schema.getSchemaName()), "'tableName': %s must be equal to 'schemaName' from 'schema': %s", + tableName, schema.getSchemaName()); + SchemaUtils.validate(schema); + + if (offlineTableConfig != null) { + String rawTableName = TableNameBuilder.extractRawTableName(offlineTableConfig.getTableName()); + Preconditions.checkState(rawTableName.equals(tableName), + "Name in 'offline' table config: %s must be equal to 'tableName': %s", rawTableName, tableName); + TableConfigUtils.validateTableName(offlineTableConfig); + TableConfigUtils.validate(offlineTableConfig, schema); + } + if (realtimeTableConfig != null) { + String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableConfig.getTableName()); + Preconditions.checkState(rawTableName.equals(tableName), + "Name in 'realtime' table config: %s must be equal to 'tableName': %s", rawTableName, tableName); + TableConfigUtils.validateTableName(realtimeTableConfig); + TableConfigUtils.validate(realtimeTableConfig, schema); + } + TableConfigUtils.verifyHybridTableConfigs(tableName, offlineTableConfig, realtimeTableConfig); + + return tableConfigs.toPrettyJsonString(); Review comment: ```suggestion return tableConfigs.toJsonString(); ``` ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -580,4 +616,124 @@ private static IndexingConfig sanitizeIndexingConfig(IndexingConfig indexingConf } return null; } + + /** + * Apply TunerConfig to the tableConfig + */ + public static void applyTunerConfig(TableConfig tableConfig, Schema schema) { + TunerConfig tunerConfig = tableConfig.getTunerConfig(); + if (tunerConfig != null && tunerConfig.getName() != null && !tunerConfig.getName().isEmpty()) { + TableConfigTuner tuner = TableConfigTunerRegistry.getTuner(tunerConfig.getName()); + tuner.init(tunerConfig, schema); + tuner.apply(tableConfig); + } + } + + /** + * Ensure that the table config has the minimum number of replicas set as per cluster configs. + * If is doesn't, set the required amount of replication in the table config + */ + public static void ensureMinReplicas(TableConfig tableConfig, int defaultTableMinReplicas) { + // For self-serviced cluster, ensure that the tables are created with at least min replication factor irrespective + // of table configuration value + SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig(); + boolean verifyReplicasPerPartition; + boolean verifyReplication; + + try { + verifyReplicasPerPartition = ReplicationUtils.useReplicasPerPartition(tableConfig); + verifyReplication = ReplicationUtils.useReplication(tableConfig); + } catch (Exception e) { + throw new IllegalStateException(String.format("Invalid tableIndexConfig or streamConfig: %s", e.getMessage()), e); + } + + if (verifyReplication) { + int requestReplication; + try { + requestReplication = segmentsConfig.getReplicationNumber(); + if (requestReplication < defaultTableMinReplicas) { + LOGGER.info("Creating table with minimum replication factor of: {} instead of requested replication: {}", + defaultTableMinReplicas, requestReplication); + segmentsConfig.setReplication(String.valueOf(defaultTableMinReplicas)); + } + } catch (NumberFormatException e) { + throw new IllegalStateException("Invalid replication number", e); + } + } + + if (verifyReplicasPerPartition) { + String replicasPerPartitionStr = segmentsConfig.getReplicasPerPartition(); + if (replicasPerPartitionStr == null) { + throw new IllegalStateException("Field replicasPerPartition needs to be specified"); + } + try { + int replicasPerPartition = Integer.parseInt(replicasPerPartitionStr); + if (replicasPerPartition < defaultTableMinReplicas) { + LOGGER.info( + "Creating table with minimum replicasPerPartition of: {} instead of requested replicasPerPartition: {}", + defaultTableMinReplicas, replicasPerPartition); + segmentsConfig.setReplicasPerPartition(String.valueOf(defaultTableMinReplicas)); + } + } catch (NumberFormatException e) { + throw new IllegalStateException("Invalid value for replicasPerPartition: '" + replicasPerPartitionStr + "'", e); + } + } + } + + /** + * Ensure the table config has storage quota set as per cluster configs. + * If it doesn't, set the quota config into the table config + */ + public static void ensureStorageQuotaConstraints(TableConfig tableConfig, String maxAllowedSize) { + // Dim tables must adhere to cluster level storage size limits + if (tableConfig.isDimTable()) { + QuotaConfig quotaConfig = tableConfig.getQuotaConfig(); + long maxAllowedSizeInBytes = DataSizeUtils.toBytes(maxAllowedSize); + + if (quotaConfig == null) { + // set a default storage quota + tableConfig.setQuotaConfig(new QuotaConfig(maxAllowedSize, null)); + LOGGER.info("Assigning default storage quota ({}) for dimension table: {}", maxAllowedSize, + tableConfig.getTableName()); + } else { + if (quotaConfig.getStorage() == null) { + // set a default storage quota and keep the RPS value + tableConfig.setQuotaConfig(new QuotaConfig(maxAllowedSize, quotaConfig.getMaxQueriesPerSecond())); + LOGGER.info("Assigning default storage quota ({}) for dimension table: {}", maxAllowedSize, + tableConfig.getTableName()); + } else { + if (quotaConfig.getStorageInBytes() > maxAllowedSizeInBytes) { + throw new IllegalStateException(String + .format("Invalid storage quota: %d, max allowed size: %d", quotaConfig.getStorageInBytes(), + maxAllowedSizeInBytes)); + } + } + } + } + } + + /** + * Consistency checks across the offline and realtime counterparts of a hybrid table + */ + public static void verifyHybridTableConfigs(String rawTableName, TableConfig offlineTableConfig, + TableConfig realtimeTableConfig) { + if (offlineTableConfig == null || realtimeTableConfig == null) { + return; + } + + LOGGER.info("Validating realtime and offline configs for the hybrid table: {}", rawTableName); + String offlineRawTableName = TableNameBuilder.extractRawTableName(offlineTableConfig.getTableName()); + String realtimeRawTableName = TableNameBuilder.extractRawTableName(realtimeTableConfig.getTableName()); + Preconditions.checkState(offlineRawTableName.equals(realtimeRawTableName), + "Raw table name for offline table: %s does not match raw table name for realtime table: %s"); Review comment: These validation are already performed on the caller side. We can only validate the cross table part (time column match) ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java ########## @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.base.Preconditions; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.access.AccessControlFactory; +import org.apache.pinot.controller.api.access.AccessControlUtils; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.segment.local.utils.SchemaUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.spi.config.TableConfigs; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.glassfish.grizzly.http.server.Request; +import org.slf4j.LoggerFactory; + + +/** + * Endpoints for CRUD of {@link TableConfigs}. + * {@link TableConfigs} is a group of the offline table config, realtime table config and schema for the same tableName. + */ +@Api(tags = Constants.TABLE_TAG) +@Path("/") +public class TableConfigsRestletResource { + + public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableConfigsRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + ControllerConf _controllerConf; + + @Inject + ControllerMetrics _controllerMetrics; + + @Inject + AccessControlFactory _accessControlFactory; + AccessControlUtils _accessControlUtils = new AccessControlUtils(); + + /** + * List all {@link TableConfigs}, where each is a group of the offline table config, realtime table config and schema for the same tableName. + * This is equivalent to a list of all raw table names + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Lists all TableConfigs in cluster", notes = "Lists all TableConfigs in cluster") + public String listConfigs() { + try { + List<String> rawTableNames = _pinotHelixResourceManager.getAllRawTables(); + Collections.sort(rawTableNames); + + ArrayNode configsList = JsonUtils.newArrayNode(); + for (String rawTableName : rawTableNames) { + configsList.add(rawTableName); + } + return configsList.toString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Gets the {@link TableConfigs} for the provided raw tableName, by fetching the offline table config for tableName_OFFLINE, + * realtime table config for tableName_REALTIME and schema for tableName + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get the TableConfigs for a given raw tableName", notes = "Get the TableConfigs for a given raw tableName") + public String getConfig( + @ApiParam(value = "Raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(tableName); + TableConfig realtimeTableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableName); + TableConfigs config = new TableConfigs(tableName, schema, offlineTableConfig, realtimeTableConfig); + return config.toPrettyJsonString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Creates a {@link TableConfigs} using the <code>tableConfigsStr</code>, by creating the schema, + * followed by the realtime tableConfig and offline tableConfig as applicable, from the {@link TableConfigs}. + * Validates the configs before applying. + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @ApiOperation(value = "Add the TableConfigs using the tableConfigsStr json", notes = "Add the TableConfigs using the tableConfigsStr json") + public SuccessResponse addConfig(String tableConfigsStr, @Context HttpHeaders httpHeaders, @Context Request request) { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs. %s", e.getMessage()), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { + String endpointUrl = request.getRequestURL().toString(); + validatePermissions(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl); + + if (offlineTableConfig != null) { + tuneConfig(offlineTableConfig, schema); + validatePermissions(offlineTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + if (realtimeTableConfig != null) { + tuneConfig(realtimeTableConfig, schema); + validatePermissions(realtimeTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + + _pinotHelixResourceManager.addSchema(schema, true); + LOGGER.info("Added schema: {}", schema.getSchemaName()); + if (offlineTableConfig != null) { + _pinotHelixResourceManager.addTable(offlineTableConfig); + LOGGER.info("Added offline table config: {}", offlineTableConfig.getTableName()); + } + if (realtimeTableConfig != null) { + _pinotHelixResourceManager.addTable(realtimeTableConfig); + LOGGER.info("Added realtime table config: {}", realtimeTableConfig.getTableName()); + } + + return new SuccessResponse("TableConfigs " + tableConfigs.getTableName() + " successfully added"); + } catch (Exception e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_ADD_ERROR, 1L); + if (e instanceof PinotHelixResourceManager.InvalidTableConfigException) { + throw new ControllerApplicationException(LOGGER, + String.format("Invalid TableConfigs: %s", tableConfigs.getTableName()), Response.Status.BAD_REQUEST, e); + } else if (e instanceof PinotHelixResourceManager.TableAlreadyExistsException) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT, e); + } else { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + } + + /** + * Deletes the {@link TableConfigs} by deleting the schema tableName, the offline table config for tableName_OFFLINE and + * the realtime table config for tableName_REALTIME + */ + @DELETE + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.DELETE) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Delete the TableConfigs", notes = "Delete the TableConfigs") + public SuccessResponse deleteConfig( + @ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + _pinotHelixResourceManager.deleteRealtimeTable(tableName); + LOGGER.info("Deleted realtime table: {}", tableName); + _pinotHelixResourceManager.deleteOfflineTable(tableName); + LOGGER.info("Deleted offline table: {}", tableName); + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + if (schema != null) { + _pinotHelixResourceManager.deleteSchema(schema); + } + LOGGER.info("Deleted schema: {}", tableName); + return new SuccessResponse("Deleted TableConfigs: " + tableName); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Updated the {@link TableConfigs} by updating the schema tableName, + * then updating the offline tableConfig or creating a new one if it doesn't already exist in the cluster, + * then updating the realtime tableConfig or creating a new one if it doesn't already exist in the cluster. + */ + @PUT + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.UPDATE) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Update the TableConfigs provided by the tableConfigsStr json", notes = "Update the TableConfigs provided by the tableConfigsStr json") + public SuccessResponse updateConfig( + @ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "Reload the table if the new schema is backward compatible") @DefaultValue("false") @QueryParam("reload") boolean reload, + String tableConfigsStr) + throws Exception { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + Preconditions.checkState(tableConfigs.getTableName().equals(tableName), + "'tableName' in TableConfigs: %s must match provided tableName: %s", tableConfigs.getTableName(), tableName); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs: %s", tableName), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { + _pinotHelixResourceManager.updateSchema(schema, reload); + LOGGER.info("Updated schema: {}", tableName); + + if (offlineTableConfig != null) { + tuneConfig(offlineTableConfig, schema); + if (_pinotHelixResourceManager.hasOfflineTable(tableName)) { + _pinotHelixResourceManager.updateTableConfig(offlineTableConfig); + LOGGER.info("Updated offline table config: {}", tableName); + } else { + _pinotHelixResourceManager.addTable(offlineTableConfig); + LOGGER.info("Created offline table config: {}", tableName); + } + if (realtimeTableConfig != null) { + tuneConfig(realtimeTableConfig, schema); + if (_pinotHelixResourceManager.hasRealtimeTable(tableName)) { + _pinotHelixResourceManager.updateTableConfig(realtimeTableConfig); + LOGGER.info("Updated realtime table config: {}", tableName); + } else { + _pinotHelixResourceManager.addTable(realtimeTableConfig); + LOGGER.info("Created realtime table config: {}", tableName); + } + } + } + } catch (PinotHelixResourceManager.InvalidTableConfigException e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR, 1L); + throw new ControllerApplicationException(LOGGER, + String.format("Invalid TableConfigs for: %s, %s", tableName, e.getMessage()), Response.Status.BAD_REQUEST, e); + } catch (Exception e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR, 1L); + throw new ControllerApplicationException(LOGGER, + String.format("Failed to update TableConfigs for: %s, %s", tableName, e.getMessage()), + Response.Status.INTERNAL_SERVER_ERROR, e); + } + + return new SuccessResponse("TableConfigs updated for " + tableName); + } + + /** + * Validates the {@link TableConfigs} as provided in the tableConfigsStr json, by validating the schema, + * the realtime table config and the offline table config + */ + @POST + @Path("/tableConfigs/validate") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Validate the TableConfigs", notes = "Validate the TableConfigs") + public String validateConfig(String tableConfigsStr) { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + } catch (IOException e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs json string: %s", tableConfigsStr), + Response.Status.BAD_REQUEST, e); + } + return validateConfig(tableConfigs); + } + + private void tuneConfig(TableConfig tableConfig, Schema schema) { + TableConfigUtils.applyTunerConfig(tableConfig, schema); + TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas()); + TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize()); + } + + private String validateConfig(TableConfigs tableConfigs) { + String tableName = tableConfigs.getTableName(); + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + try { + Preconditions.checkState(offlineTableConfig != null || realtimeTableConfig != null, + "Must provide at least one of 'realtime' or 'offline' table configs for adding TableConfigs: %s", tableName); + Preconditions.checkState(schema != null, "Must provide 'schema' for adding TableConfigs: %s", tableName); + Preconditions.checkState(!tableName.isEmpty(), "'tableName' cannot be empty in TableConfigs"); + + Preconditions + .checkState(tableName.equals(schema.getSchemaName()), "'tableName': %s must be equal to 'schemaName' from 'schema': %s", + tableName, schema.getSchemaName()); + SchemaUtils.validate(schema); + + if (offlineTableConfig != null) { + String rawTableName = TableNameBuilder.extractRawTableName(offlineTableConfig.getTableName()); + Preconditions.checkState(rawTableName.equals(tableName), + "Name in 'offline' table config: %s must be equal to 'tableName': %s", rawTableName, tableName); + TableConfigUtils.validateTableName(offlineTableConfig); + TableConfigUtils.validate(offlineTableConfig, schema); + } + if (realtimeTableConfig != null) { + String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableConfig.getTableName()); + Preconditions.checkState(rawTableName.equals(tableName), + "Name in 'realtime' table config: %s must be equal to 'tableName': %s", rawTableName, tableName); + TableConfigUtils.validateTableName(realtimeTableConfig); + TableConfigUtils.validate(realtimeTableConfig, schema); + } + TableConfigUtils.verifyHybridTableConfigs(tableName, offlineTableConfig, realtimeTableConfig); Review comment: Check not null before calling this function ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java ########## @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.base.Preconditions; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.access.AccessControlFactory; +import org.apache.pinot.controller.api.access.AccessControlUtils; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.segment.local.utils.SchemaUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.spi.config.TableConfigs; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.glassfish.grizzly.http.server.Request; +import org.slf4j.LoggerFactory; + + +/** + * Endpoints for CRUD of {@link TableConfigs}. + * {@link TableConfigs} is a group of the offline table config, realtime table config and schema for the same tableName. + */ +@Api(tags = Constants.TABLE_TAG) +@Path("/") +public class TableConfigsRestletResource { + + public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableConfigsRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + ControllerConf _controllerConf; + + @Inject + ControllerMetrics _controllerMetrics; + + @Inject + AccessControlFactory _accessControlFactory; + AccessControlUtils _accessControlUtils = new AccessControlUtils(); + + /** + * List all {@link TableConfigs}, where each is a group of the offline table config, realtime table config and schema for the same tableName. + * This is equivalent to a list of all raw table names + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Lists all TableConfigs in cluster", notes = "Lists all TableConfigs in cluster") + public String listConfigs() { + try { + List<String> rawTableNames = _pinotHelixResourceManager.getAllRawTables(); + Collections.sort(rawTableNames); + + ArrayNode configsList = JsonUtils.newArrayNode(); + for (String rawTableName : rawTableNames) { + configsList.add(rawTableName); + } + return configsList.toString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Gets the {@link TableConfigs} for the provided raw tableName, by fetching the offline table config for tableName_OFFLINE, + * realtime table config for tableName_REALTIME and schema for tableName + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get the TableConfigs for a given raw tableName", notes = "Get the TableConfigs for a given raw tableName") + public String getConfig( + @ApiParam(value = "Raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(tableName); + TableConfig realtimeTableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableName); + TableConfigs config = new TableConfigs(tableName, schema, offlineTableConfig, realtimeTableConfig); + return config.toPrettyJsonString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Creates a {@link TableConfigs} using the <code>tableConfigsStr</code>, by creating the schema, + * followed by the realtime tableConfig and offline tableConfig as applicable, from the {@link TableConfigs}. + * Validates the configs before applying. + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @ApiOperation(value = "Add the TableConfigs using the tableConfigsStr json", notes = "Add the TableConfigs using the tableConfigsStr json") + public SuccessResponse addConfig(String tableConfigsStr, @Context HttpHeaders httpHeaders, @Context Request request) { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs. %s", e.getMessage()), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { + String endpointUrl = request.getRequestURL().toString(); + validatePermissions(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl); + + if (offlineTableConfig != null) { + tuneConfig(offlineTableConfig, schema); + validatePermissions(offlineTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + if (realtimeTableConfig != null) { + tuneConfig(realtimeTableConfig, schema); + validatePermissions(realtimeTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + + _pinotHelixResourceManager.addSchema(schema, true); Review comment: Check if the schema/table config already exist before posting them. If one of the posting failed, we should clean up the already posted configs. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java ########## @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.base.Preconditions; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.access.AccessControlFactory; +import org.apache.pinot.controller.api.access.AccessControlUtils; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.segment.local.utils.SchemaUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.spi.config.TableConfigs; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.glassfish.grizzly.http.server.Request; +import org.slf4j.LoggerFactory; + + +/** + * Endpoints for CRUD of {@link TableConfigs}. + * {@link TableConfigs} is a group of the offline table config, realtime table config and schema for the same tableName. + */ +@Api(tags = Constants.TABLE_TAG) +@Path("/") +public class TableConfigsRestletResource { + + public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableConfigsRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + ControllerConf _controllerConf; + + @Inject + ControllerMetrics _controllerMetrics; + + @Inject + AccessControlFactory _accessControlFactory; + AccessControlUtils _accessControlUtils = new AccessControlUtils(); + + /** + * List all {@link TableConfigs}, where each is a group of the offline table config, realtime table config and schema for the same tableName. + * This is equivalent to a list of all raw table names + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Lists all TableConfigs in cluster", notes = "Lists all TableConfigs in cluster") + public String listConfigs() { + try { + List<String> rawTableNames = _pinotHelixResourceManager.getAllRawTables(); + Collections.sort(rawTableNames); + + ArrayNode configsList = JsonUtils.newArrayNode(); + for (String rawTableName : rawTableNames) { + configsList.add(rawTableName); + } + return configsList.toString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Gets the {@link TableConfigs} for the provided raw tableName, by fetching the offline table config for tableName_OFFLINE, + * realtime table config for tableName_REALTIME and schema for tableName + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get the TableConfigs for a given raw tableName", notes = "Get the TableConfigs for a given raw tableName") + public String getConfig( + @ApiParam(value = "Raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(tableName); + TableConfig realtimeTableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableName); + TableConfigs config = new TableConfigs(tableName, schema, offlineTableConfig, realtimeTableConfig); + return config.toPrettyJsonString(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Creates a {@link TableConfigs} using the <code>tableConfigsStr</code>, by creating the schema, + * followed by the realtime tableConfig and offline tableConfig as applicable, from the {@link TableConfigs}. + * Validates the configs before applying. + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tableConfigs") + @ApiOperation(value = "Add the TableConfigs using the tableConfigsStr json", notes = "Add the TableConfigs using the tableConfigsStr json") + public SuccessResponse addConfig(String tableConfigsStr, @Context HttpHeaders httpHeaders, @Context Request request) { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs. %s", e.getMessage()), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { + String endpointUrl = request.getRequestURL().toString(); + validatePermissions(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl); + + if (offlineTableConfig != null) { + tuneConfig(offlineTableConfig, schema); + validatePermissions(offlineTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + if (realtimeTableConfig != null) { + tuneConfig(realtimeTableConfig, schema); + validatePermissions(realtimeTableConfig.getTableName(), AccessType.CREATE, httpHeaders, endpointUrl); + } + + _pinotHelixResourceManager.addSchema(schema, true); + LOGGER.info("Added schema: {}", schema.getSchemaName()); + if (offlineTableConfig != null) { + _pinotHelixResourceManager.addTable(offlineTableConfig); + LOGGER.info("Added offline table config: {}", offlineTableConfig.getTableName()); + } + if (realtimeTableConfig != null) { + _pinotHelixResourceManager.addTable(realtimeTableConfig); + LOGGER.info("Added realtime table config: {}", realtimeTableConfig.getTableName()); + } + + return new SuccessResponse("TableConfigs " + tableConfigs.getTableName() + " successfully added"); + } catch (Exception e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_ADD_ERROR, 1L); + if (e instanceof PinotHelixResourceManager.InvalidTableConfigException) { + throw new ControllerApplicationException(LOGGER, + String.format("Invalid TableConfigs: %s", tableConfigs.getTableName()), Response.Status.BAD_REQUEST, e); + } else if (e instanceof PinotHelixResourceManager.TableAlreadyExistsException) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT, e); + } else { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + } + + /** + * Deletes the {@link TableConfigs} by deleting the schema tableName, the offline table config for tableName_OFFLINE and + * the realtime table config for tableName_REALTIME + */ + @DELETE + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.DELETE) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Delete the TableConfigs", notes = "Delete the TableConfigs") + public SuccessResponse deleteConfig( + @ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName") String tableName) { + + try { + _pinotHelixResourceManager.deleteRealtimeTable(tableName); + LOGGER.info("Deleted realtime table: {}", tableName); + _pinotHelixResourceManager.deleteOfflineTable(tableName); + LOGGER.info("Deleted offline table: {}", tableName); + Schema schema = _pinotHelixResourceManager.getSchema(tableName); + if (schema != null) { + _pinotHelixResourceManager.deleteSchema(schema); + } + LOGGER.info("Deleted schema: {}", tableName); + return new SuccessResponse("Deleted TableConfigs: " + tableName); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Updated the {@link TableConfigs} by updating the schema tableName, + * then updating the offline tableConfig or creating a new one if it doesn't already exist in the cluster, + * then updating the realtime tableConfig or creating a new one if it doesn't already exist in the cluster. + */ + @PUT + @Path("/tableConfigs/{tableName}") + @Authenticate(AccessType.UPDATE) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Update the TableConfigs provided by the tableConfigsStr json", notes = "Update the TableConfigs provided by the tableConfigsStr json") + public SuccessResponse updateConfig( + @ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "Reload the table if the new schema is backward compatible") @DefaultValue("false") @QueryParam("reload") boolean reload, + String tableConfigsStr) + throws Exception { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + Preconditions.checkState(tableConfigs.getTableName().equals(tableName), + "'tableName' in TableConfigs: %s must match provided tableName: %s", tableConfigs.getTableName(), tableName); + validateConfig(tableConfigs); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs: %s", tableName), + Response.Status.BAD_REQUEST, e); + } + + TableConfig offlineTableConfig = tableConfigs.getOffline(); + TableConfig realtimeTableConfig = tableConfigs.getRealtime(); + Schema schema = tableConfigs.getSchema(); + + try { + _pinotHelixResourceManager.updateSchema(schema, reload); + LOGGER.info("Updated schema: {}", tableName); + + if (offlineTableConfig != null) { + tuneConfig(offlineTableConfig, schema); + if (_pinotHelixResourceManager.hasOfflineTable(tableName)) { + _pinotHelixResourceManager.updateTableConfig(offlineTableConfig); + LOGGER.info("Updated offline table config: {}", tableName); + } else { + _pinotHelixResourceManager.addTable(offlineTableConfig); + LOGGER.info("Created offline table config: {}", tableName); + } + if (realtimeTableConfig != null) { + tuneConfig(realtimeTableConfig, schema); + if (_pinotHelixResourceManager.hasRealtimeTable(tableName)) { + _pinotHelixResourceManager.updateTableConfig(realtimeTableConfig); + LOGGER.info("Updated realtime table config: {}", tableName); + } else { + _pinotHelixResourceManager.addTable(realtimeTableConfig); + LOGGER.info("Created realtime table config: {}", tableName); + } + } + } + } catch (PinotHelixResourceManager.InvalidTableConfigException e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR, 1L); + throw new ControllerApplicationException(LOGGER, + String.format("Invalid TableConfigs for: %s, %s", tableName, e.getMessage()), Response.Status.BAD_REQUEST, e); + } catch (Exception e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR, 1L); + throw new ControllerApplicationException(LOGGER, + String.format("Failed to update TableConfigs for: %s, %s", tableName, e.getMessage()), + Response.Status.INTERNAL_SERVER_ERROR, e); + } + + return new SuccessResponse("TableConfigs updated for " + tableName); + } + + /** + * Validates the {@link TableConfigs} as provided in the tableConfigsStr json, by validating the schema, + * the realtime table config and the offline table config + */ + @POST + @Path("/tableConfigs/validate") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Validate the TableConfigs", notes = "Validate the TableConfigs") + public String validateConfig(String tableConfigsStr) { + TableConfigs tableConfigs; + try { + tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class); + } catch (IOException e) { + throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs json string: %s", tableConfigsStr), + Response.Status.BAD_REQUEST, e); + } + return validateConfig(tableConfigs); + } + + private void tuneConfig(TableConfig tableConfig, Schema schema) { + TableConfigUtils.applyTunerConfig(tableConfig, schema); + TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas()); + TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize()); + } + + private String validateConfig(TableConfigs tableConfigs) { + String tableName = tableConfigs.getTableName(); Review comment: (nit) `rawTableName` for clarity -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org