Jackie-Jiang commented on code in PR #8355:
URL: https://github.com/apache/pinot/pull/8355#discussion_r842244228


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.ColumnStats;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based 
on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or 
staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey
+   * -comparisonColSize, it uses for calculating BytesPerValue
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column 
stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(@QueryParam("tableConfig") String 
tableConfigStr,

Review Comment:
   This API is built as a tool instead of a regular restlet API as it doesn't 
rely on the information stored in the cluster. We should at least read the 
instance config info, calculate the partitions on each server, and estimate the 
heap usage per server



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ColumnStats.java:
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+
+/*
+ * Container object for stats of Pinot columns, output by kafka sampler.
+ */
+public class ColumnStats {
+
+  public static final String CARDINALITY = "cardinality";
+  public static final String PRIMARY_KEY_SIZE = "primaryKeySize";
+  public static final String COMPARISON_COL_SIZE = "comparisonColSize";
+  public static final String PARTITIONS_NUM = "partitionsNums";

Review Comment:
   ```suggestion
     public static final String NUM_PARTITIONS = "numPartitions";
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.ColumnStats;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based 
on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or 
staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey
+   * -comparisonColSize, it uses for calculating BytesPerValue
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column 
stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(@QueryParam("tableConfig") String 
tableConfigStr,
+      @QueryParam("schema") String tableSchemaStr, @QueryParam("columnStats") 
String columnStatsStr) {
+    TableConfig tableConfig;
+    Schema schema;
+    ColumnStats columnStats;
+    try {
+      columnStats = JsonUtils.stringToObject(columnStatsStr, 
ColumnStats.class);
+    } catch (IOException e) {
+      String msg = String.format("Invalid column stats json string: %s", 
columnStatsStr);
+      throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
+    }
+
+    try {
+      tableConfig = JsonUtils.stringToObject(tableConfigStr, 
TableConfig.class);
+    } catch (IOException e) {
+      String msg = String.format("Invalid table config json string: %s", 
tableConfigStr);
+      throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
+    }
+    try {
+      schema = JsonUtils.stringToObject(tableSchemaStr, Schema.class);
+    } catch (IOException e) {
+      String msg = String.format("Invalid table schema json string: %s", 
tableSchemaStr);
+      throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
+    }
+
+    int bytesPerKey = 0;
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+    // Estimated key space, it contains primary key columns
+    for (String primaryKey : primaryKeys) {
+      FieldSpec.DataType dt = schema.getFieldSpecFor(primaryKey).getDataType();
+      if (dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST || dt 
== FieldSpec.DataType.MAP) {
+        String msg = "Not support data types for primary key columns";
+        throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST);
+      } else if (dt == FieldSpec.DataType.STRING) {
+        bytesPerKey += columnStats.getPrimaryKeySize();
+      } else {
+        bytesPerKey += dt.size();

Review Comment:
   We should probably also count the array size?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.ColumnStats;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based 
on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or 
staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey
+   * -comparisonColSize, it uses for calculating BytesPerValue
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column 
stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(@QueryParam("tableConfig") String 
tableConfigStr,
+      @QueryParam("schema") String tableSchemaStr, @QueryParam("columnStats") 
String columnStatsStr) {
+    TableConfig tableConfig;
+    Schema schema;
+    ColumnStats columnStats;
+    try {
+      columnStats = JsonUtils.stringToObject(columnStatsStr, 
ColumnStats.class);
+    } catch (IOException e) {
+      String msg = String.format("Invalid column stats json string: %s", 
columnStatsStr);
+      throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
+    }
+
+    try {
+      tableConfig = JsonUtils.stringToObject(tableConfigStr, 
TableConfig.class);
+    } catch (IOException e) {
+      String msg = String.format("Invalid table config json string: %s", 
tableConfigStr);
+      throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
+    }
+    try {
+      schema = JsonUtils.stringToObject(tableSchemaStr, Schema.class);
+    } catch (IOException e) {
+      String msg = String.format("Invalid table schema json string: %s", 
tableSchemaStr);
+      throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
+    }
+
+    int bytesPerKey = 0;
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+    // Estimated key space, it contains primary key columns
+    for (String primaryKey : primaryKeys) {
+      FieldSpec.DataType dt = schema.getFieldSpecFor(primaryKey).getDataType();
+      if (dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST || dt 
== FieldSpec.DataType.MAP) {
+        String msg = "Not support data types for primary key columns";
+        throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST);
+      } else if (dt == FieldSpec.DataType.STRING) {
+        bytesPerKey += columnStats.getPrimaryKeySize();

Review Comment:
   If primary key size is provided, we should skip this calculation and 
directly using the provided value



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.ColumnStats;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based 
on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or 
staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey
+   * -comparisonColSize, it uses for calculating BytesPerValue
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column 
stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(@QueryParam("tableConfig") String 
tableConfigStr,
+      @QueryParam("schema") String tableSchemaStr, @QueryParam("columnStats") 
String columnStatsStr) {
+    TableConfig tableConfig;
+    Schema schema;
+    ColumnStats columnStats;
+    try {
+      columnStats = JsonUtils.stringToObject(columnStatsStr, 
ColumnStats.class);
+    } catch (IOException e) {
+      String msg = String.format("Invalid column stats json string: %s", 
columnStatsStr);
+      throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
+    }
+
+    try {
+      tableConfig = JsonUtils.stringToObject(tableConfigStr, 
TableConfig.class);
+    } catch (IOException e) {
+      String msg = String.format("Invalid table config json string: %s", 
tableConfigStr);
+      throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
+    }
+    try {
+      schema = JsonUtils.stringToObject(tableSchemaStr, Schema.class);
+    } catch (IOException e) {
+      String msg = String.format("Invalid table schema json string: %s", 
tableSchemaStr);
+      throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
+    }
+
+    int bytesPerKey = 0;
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+    // Estimated key space, it contains primary key columns
+    for (String primaryKey : primaryKeys) {
+      FieldSpec.DataType dt = schema.getFieldSpecFor(primaryKey).getDataType();
+      if (dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST || dt 
== FieldSpec.DataType.MAP) {
+        String msg = "Not support data types for primary key columns";
+        throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST);
+      } else if (dt == FieldSpec.DataType.STRING) {
+        bytesPerKey += columnStats.getPrimaryKeySize();
+      } else {
+        bytesPerKey += dt.size();
+      }
+    }
+    // Estimated value space, it contains <segmentName, DocId, 
ComparisonValue(timestamp)>
+    int bytesPerValue =
+        tableConfig.getUpsertConfig().getComparisonColumn() != null ? 52 + 
columnStats.getComparisonColSize() : 64;

Review Comment:
   If comparison column is fixed width, no need to provide it in `columnStats`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to