Jackie-Jiang commented on code in PR #8355:
URL: https://github.com/apache/pinot/pull/8355#discussion_r861391683


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based 
on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or 
staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column 
stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(String tableSchemaConfigStr,
+      @ApiParam(value = "cardinality in string format", required = true) 
@QueryParam("cardinality") String cardinality,
+      @ApiParam(value = "primaryKeySize in string format") 
@QueryParam("primaryKeySize") String primaryKeySize,
+      @ApiParam(value = "numPartitions in string format") 
@QueryParam("numPartitions") String numPartitionsStr) {
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    TableAndSchemaConfig tableSchemaConfig;
+
+    try {
+      tableSchemaConfig = JsonUtils.stringToObject(tableSchemaConfigStr, 
TableAndSchemaConfig.class);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Invalid TableSchemaConfigs json string: %s", 
tableSchemaConfigStr),
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    TableConfig tableConfig = tableSchemaConfig.getTableConfig();
+    resultData.put("tableName", tableConfig.getTableName());
+
+    Schema schema = tableSchemaConfig.getSchema();
+
+    // Estimated key space, it contains primary key columns
+    int bytesPerKey = 0;
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+
+    if (primaryKeySize != null) {
+      bytesPerKey += Integer.valueOf(primaryKeySize);
+    } else {
+      for (String primaryKey : primaryKeys) {
+        FieldSpec.DataType dt = 
schema.getFieldSpecFor(primaryKey).getDataType();
+        if (dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST || 
dt == FieldSpec.DataType.MAP) {
+          String msg = "Not support data types for primary key columns";
+          throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST);
+        } else if (dt == FieldSpec.DataType.STRING) {
+          String msg = "Missing primary key sizes for String columns";
+          throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST);
+        } else {
+          bytesPerKey += dt.size();
+        }
+      }
+      // Java has a 24 bytes array overhead and there's also 8 bytes for the 
actual array object
+      bytesPerKey += 32;
+    }
+
+    // Estimated value space, it contains <segmentName, DocId, 
ComparisonValue(timestamp)> and overhead.
+    int bytesPerValue = 64;
+    String comparisonColumn = 
tableConfig.getUpsertConfig().getComparisonColumn();
+    if (comparisonColumn != null) {
+      FieldSpec.DataType dt = 
schema.getFieldSpecFor(comparisonColumn).getDataType();
+      if (dt == FieldSpec.DataType.STRING || dt == FieldSpec.DataType.JSON || 
dt == FieldSpec.DataType.LIST
+          || dt == FieldSpec.DataType.MAP) {
+        String msg = "Not support data types for the comparison column";
+        throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST);
+      } else {
+        bytesPerValue = 52 + dt.size();
+      }
+    }
+
+    resultData.put("bytesPerKey", bytesPerKey);
+    resultData.put("bytesPerValue", bytesPerValue);
+
+    long primaryKeyCardinality = Long.valueOf(cardinality);
+    long totalKeySpace = bytesPerKey * primaryKeyCardinality;
+    long totalValueSpace = bytesPerValue * primaryKeyCardinality;
+    long totalSpace = totalKeySpace + totalValueSpace;

Review Comment:
   In order to make more accurate estimation, we should count the object 
overhead because as you can see, the object overhead can be larger than the 
actual content within the entry.
   I don't know the formula of object overhead from map size, so we need to do 
some research here. If you find that is too hard, we may add a TODO and address 
that later. But users should know this size does not include the overhead, so 
it can be quite off.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to