Jackie-Jiang commented on code in PR #10463:
URL: https://github.com/apache/pinot/pull/10463#discussion_r1232841016


##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -425,7 +425,7 @@ public Response downloadSegment(
   public Response downloadValidDocIds(
       @ApiParam(value = "Name of the table with type REALTIME", required = 
true, example = "myTable_REALTIME")
       @PathParam("tableNameWithType") String tableNameWithType,
-      @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") String segmentName,

Review Comment:
   I think we will need this annotation for special segment name. Same for the 
new added API



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -464,6 +464,56 @@ public Response downloadValidDocIds(
     }
   }
 
+  @GET
+  
@Path("/tables/{tableNameWithType}/segments/{segmentName}/validDocIdMetadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Provides segment validDocId metadata",
+      notes = "Provides segment validDocId metadata")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal server error", response = 
ErrorInfo.class),
+      @ApiResponse(code = 404, message = "Table or segment not found", 
response = ErrorInfo.class)
+  })
+  public String getValidDocIdMetadata(
+      @ApiParam(value = "Table name including type", required = true, example 
= "myTable_REALTIME")
+      @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Segment name", required = true) 
@PathParam("segmentName") String segmentName) {

Review Comment:
   We should probably provide a list of segments to reduce the API calls from 
the task generator



##########
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java:
##########
@@ -136,4 +136,18 @@ public static class SegmentGenerationAndPushTask {
     public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
         "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
   }
+
+  public static class UpsertCompactionTask {
+    public static final String TASK_TYPE = "UpsertCompactionTask";
+    /**
+     * The time period to wait before picking segments for this task
+     * e.g. if set to "2d", no task will be scheduled for a time window 
younger than 2 days
+     */
+    public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+    /**
+     * The maximum percent of old records allowed for a completed segment.
+     * e.g. if the percent surpasses 30, then the segment will be compacted
+     */
+    public static final String INVALID_RECORDS_THRESHOLD_PERCENT = 
"invalidRecordsThresholdPercent";

Review Comment:
   I'm not sure if we want to go with threshold or absolute value. Percentage 
can be very aggressive when the segment is very small. E.g. 1/3 is 33%
   cc @snleee 



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -464,6 +464,56 @@ public Response downloadValidDocIds(
     }
   }
 
+  @GET
+  
@Path("/tables/{tableNameWithType}/segments/{segmentName}/validDocIdMetadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Provides segment validDocId metadata",
+      notes = "Provides segment validDocId metadata")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal server error", response = 
ErrorInfo.class),
+      @ApiResponse(code = 404, message = "Table or segment not found", 
response = ErrorInfo.class)
+  })
+  public String getValidDocIdMetadata(
+      @ApiParam(value = "Table name including type", required = true, example 
= "myTable_REALTIME")
+      @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Segment name", required = true) 
@PathParam("segmentName") String segmentName) {
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, 
tableNameWithType);
+    SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
+    if (segmentDataManager == null) {
+      throw new WebApplicationException(
+          String.format("Table %s segment %s does not exist", 
tableNameWithType, segmentName),
+          Response.Status.NOT_FOUND);
+    }
+
+    try {
+      IndexSegment indexSegment = segmentDataManager.getSegment();
+      if (!(indexSegment instanceof ImmutableSegmentImpl)) {
+        throw new WebApplicationException(
+            String.format("Table %s segment %s is not a immutable segment", 
tableNameWithType, segmentName),
+            Response.Status.BAD_REQUEST);
+      }
+      MutableRoaringBitmap validDocIds =
+          indexSegment.getValidDocIds() != null ? 
indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+      if (validDocIds == null) {
+        throw new WebApplicationException(
+            String.format("Missing validDocIds for table %s segment %s does 
not exist", tableNameWithType, segmentName),
+            Response.Status.NOT_FOUND);
+      }
+      Map<String, Integer> validDocIdMetadata = new HashMap<>();
+      int totalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
+      int totalValidDocs = (int) validDocIds.stream().count();

Review Comment:
   ```suggestion
         int totalValidDocs = validDocIds.getCardinality();
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -527,6 +528,19 @@ static void validateTaskConfigs(TableConfig tableConfig, 
Schema schema) {
                   String.format("Column \"%s\" has invalid aggregate type: 
%s", entry.getKey(), entry.getValue()));
             }
           }
+        } else if (taskTypeConfigName.equals(UPSERT_COMPACTION_TASK_TYPE)) {
+          // check table is realtime
+          Preconditions.checkState(tableConfig.getTableType() == 
TableType.REALTIME,
+              "UpsertCompactionTask only supports realtime tables!");
+          // check upsert enabled
+          Preconditions.checkState(tableConfig.isUpsertEnabled(),
+              "Upsert must be enabled for UpsertCompactionTask");
+          // check no malformed period
+          
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bufferTimePeriod", 
"2d"));

Review Comment:
   (minor) We don't need to do this verification if the config is not provided 
(no need to verify the default value)



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java:
##########
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class);
+  private static final String DEFAULT_BUFFER_PERIOD = "7d";
+  private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 30.0;
+  @Override
+  public String getTaskType() {
+    return MinionConstants.UpsertCompactionTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+    for (TableConfig tableConfig: tableConfigs) {
+      if (!validate(tableConfig)) {
+        continue;
+      }
+
+      String tableNameWithType = tableConfig.getTableName();
+      LOGGER.info("Start generating task configs for table: {} for task: {}",
+          tableNameWithType, taskType);
+
+      Map<String, String> taskConfigs = 
tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
+      Map<String, String> compactionConfigs = 
getCompactionConfigs(taskConfigs);
+      List<SegmentZKMetadata> completedSegments = 
getCompletedSegments(tableNameWithType, compactionConfigs);
+
+      if (completedSegments.isEmpty()) {
+        LOGGER.info("No completed segments were available for compaction for 
table: {}", tableNameWithType);
+        continue;
+      }
+
+      // get server to segment mappings
+      Map<String, List<String>> serverToSegments = 
_clusterInfoAccessor.getServerToSegmentsMap(tableNameWithType);
+      Map<String, String> segmentToServer = 
getSegmentToServer(serverToSegments);

Review Comment:
   Here we are converting back and forth (`getServerToSegmentsMap()` read the 
IS of the table, and then change segment -> servers map back to server -> 
segments map). We should read both IS and EV of the table, find online server 
for each segment and group segments on each server so that we only need to read 
validDocId metadata from each server once



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -489,14 +489,16 @@ static void setDefaultTimeValueIfInvalid(TableConfig 
tableConfig, Schema schema,
 
   @Override
   public void addSegment(ImmutableSegment immutableSegment) {
-    if (isUpsertEnabled()) {
-      handleUpsert(immutableSegment);
-      return;
-    }
+    if (immutableSegment instanceof ImmutableSegmentImpl) {

Review Comment:
   This can cause problems when we are using an empty segment to replace an 
existing segment. Any specific reason why you change this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to