Jackie-Jiang commented on code in PR #10463:
URL: https://github.com/apache/pinot/pull/10463#discussion_r1271265598


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java:
##########
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nullable;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.http.client.utils.URIBuilder;
+import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import 
org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class UpsertCompactionTaskExecutor extends 
BaseSingleSegmentConversionExecutor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class);
+  private static HelixManager _helixManager = MINION_CONTEXT.getHelixManager();
+  private static HelixAdmin _clusterManagementTool = 
_helixManager.getClusterManagmentTool();
+  private static String _clusterName = _helixManager.getClusterName();
+
+  private class CompactedRecordReader implements RecordReader {
+    private final PinotSegmentRecordReader _pinotSegmentRecordReader;
+    private final PeekableIntIterator _validDocIdsIterator;
+    // Reusable generic row to store the next row to return
+    GenericRow _nextRow = new GenericRow();
+    // Flag to mark whether we need to fetch another row
+    boolean _nextRowReturned = true;
+
+    CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) {
+      _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+      _pinotSegmentRecordReader.init(indexDir, null, null);
+      _validDocIdsIterator = validDocIds.getIntIterator();
+    }
+
+    @Override
+    public void init(File dataFile, Set<String> fieldsToRead, @Nullable 
RecordReaderConfig recordReaderConfig) {
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (!_validDocIdsIterator.hasNext() && _nextRowReturned) {
+        return false;
+      }
+
+      // If next row has not been returned, return true
+      if (!_nextRowReturned) {
+        return true;
+      }
+
+      // Try to get the next row to return
+      if (_validDocIdsIterator.hasNext()) {
+        int docId = _validDocIdsIterator.next();
+        _nextRow.clear();
+        _pinotSegmentRecordReader.getRecord(docId, _nextRow);
+        _nextRowReturned = false;
+        return true;
+      }
+
+      // Cannot find next row to return, return false
+      return false;
+    }
+
+    @Override
+    public GenericRow next() {
+      return next(new GenericRow());
+    }
+
+    @Override
+    public GenericRow next(GenericRow reuse) {
+      Preconditions.checkState(!_nextRowReturned);
+      reuse.init(_nextRow);
+      _nextRowReturned = true;
+      return reuse;
+    }
+
+    @Override
+    public void rewind() {
+      _pinotSegmentRecordReader.rewind();
+      _nextRowReturned = true;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      _pinotSegmentRecordReader.close();
+    }
+  }
+
+  @Override
+  protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, 
File indexDir, File workingDir)
+      throws Exception {
+    _eventObserver.notifyProgress(pinotTaskConfig, "Compacting segment: " + 
indexDir);
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
+    String taskType = pinotTaskConfig.getTaskType();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
+    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+    TableConfig tableConfig = getTableConfig(tableNameWithType);
+    ImmutableRoaringBitmap validDocIds = getValidDocIds(tableNameWithType, 
configs);
+
+    if (validDocIds.isEmpty()) {
+      // prevents empty segment generation
+      LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment: 
{}", tableNameWithType, segmentName);
+      if (indexDir.exists() && !FileUtils.deleteQuietly(indexDir)) {
+        LOGGER.warn("Failed to delete input segment: {}", 
indexDir.getAbsolutePath());
+      }
+      if (!FileUtils.deleteQuietly(workingDir)) {
+        LOGGER.warn("Failed to delete working directory: {}", 
workingDir.getAbsolutePath());
+      }
+      return new 
SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+          .build();
+    }
+
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    try (CompactedRecordReader compactedRecordReader = new 
CompactedRecordReader(indexDir, validDocIds)) {
+      SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, 
tableConfig, segmentMetadata, segmentName);
+      SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+      driver.init(config, compactedRecordReader);
+      driver.build();
+    }
+
+    File compactedSegmentFile = new File(workingDir, segmentName);
+    SegmentConversionResult result =
+        new 
SegmentConversionResult.Builder().setFile(compactedSegmentFile).setTableNameWithType(tableNameWithType)
+            .setSegmentName(segmentName).build();
+
+    long endMillis = System.currentTimeMillis();
+    LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", 
taskType, configs, (endMillis - startMillis));
+
+    return result;
+  }
+
+  private static SegmentGeneratorConfig getSegmentGeneratorConfig(File 
workingDir, TableConfig tableConfig,
+      SegmentMetadataImpl segmentMetadata, String segmentName) {
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
segmentMetadata.getSchema());
+    config.setOutDir(workingDir.getPath());
+    config.setSegmentName(segmentName);
+    // Keep index creation time the same as original segment because both 
segments use the same raw data.
+    // This way, for REFRESH case, when new segment gets pushed to controller, 
we can use index creation time to
+    // identify if the new pushed segment has newer data than the existing one.
+    
config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));
+
+    // The time column type info is not stored in the segment metadata.
+    // Keep segment start/end time to properly handle time column type other 
than EPOCH (e.g.SIMPLE_FORMAT).
+    if (segmentMetadata.getTimeInterval() != null) {
+      
config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
+      config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
+      config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
+      config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
+    }
+    return config;
+  }
+
+  // TODO: Consider moving this method to a more appropriate class (eg 
ServerSegmentMetadataReader)
+  private static ImmutableRoaringBitmap getValidDocIds(String 
tableNameWithType, Map<String, String> configs)
+      throws URISyntaxException {
+    String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
+    String server = getServer(segmentName, tableNameWithType);
+
+    // get the url for the validDocIds for the server
+    InstanceConfig instanceConfig = 
_clusterManagementTool.getInstanceConfig(_clusterName, server);
+    String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
+    String url =
+        new 
URIBuilder(endpoint).setPath(String.format("/segments/%s/%s/validDocIds", 
tableNameWithType, segmentName))
+            .toString();
+
+    // get the validDocIds from that server
+    Response response = 
ClientBuilder.newClient().target(url).request().get(Response.class);
+    Preconditions.checkState(response.getStatus() == 
Response.Status.OK.getStatusCode(),
+        "Unable to retrieve validDocIds from %s", url);
+    byte[] snapshot = response.readEntity(byte[].class);
+    ImmutableRoaringBitmap validDocIds = new 
ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot));
+    return validDocIds;
+  }
+
+  @VisibleForTesting
+  public static String getServer(String segmentName, String tableNameWithType) 
{
+    String server = null;
+    ExternalView externalView = 
_clusterManagementTool.getResourceExternalView(_clusterName, tableNameWithType);
+    if (externalView == null) {
+      throw new IllegalStateException("External view does not exist for table: 
" + tableNameWithType);
+    }
+    for (Map.Entry<String, Map<String, String>> entry : 
externalView.getRecord().getMapFields().entrySet()) {

Review Comment:
   We can simply look up the map here. We also need to check the state of the 
server and pick one that is `ONLINE`
   ```suggestion
     Map<String, String> instanceStateMap = 
externalView.getStateMap(segmentName);
     if (instanceStateMap == null) {
       throw new IllegalStateException("Failed to find segment: ...");
     }
     for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
       if (entry.getValue().equals(SegmentStateModel.ONLINE) {
         return entry.getKey();
       }
     }
     throw new IllegalStateException("Failed to find ONLINE server for segment: 
...");
   ```



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java:
##########
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class);
+  private static final String DEFAULT_BUFFER_PERIOD = "7d";
+  private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0;
+  private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 0;
+
+  public static class SegmentSelectionResult {
+
+    private List<SegmentZKMetadata> _segmentsForCompaction;
+
+    private List<String> _segmentsForDeletion;
+
+    SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, 
List<String> segmentsForDeletion) {
+      _segmentsForCompaction = segmentsForCompaction;
+      _segmentsForDeletion = segmentsForDeletion;
+    }
+
+    public List<SegmentZKMetadata> getSegmentsForCompaction() {
+      return _segmentsForCompaction;
+    }
+
+    public List<String> getSegmentsForDeletion() {
+      return _segmentsForDeletion;
+    }
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.UpsertCompactionTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+    for (TableConfig tableConfig : tableConfigs) {
+      if (!validate(tableConfig)) {
+        continue;
+      }
+
+      String tableNameWithType = tableConfig.getTableName();
+      LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableNameWithType, taskType);
+
+      Map<String, String> taskConfigs = 
tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
+      Map<String, String> compactionConfigs = 
getCompactionConfigs(taskConfigs);
+      List<SegmentZKMetadata> completedSegments = 
getCompletedSegments(tableNameWithType, compactionConfigs);
+
+      if (completedSegments.isEmpty()) {
+        LOGGER.info("No completed segments were eligible for compaction for 
table: {}", tableNameWithType);
+        continue;
+      }
+
+      // get server to segment mappings
+      Map<String, List<String>> serverToSegments = 
_clusterInfoAccessor.getServerToSegmentsMap(tableNameWithType);
+      PinotHelixResourceManager pinotHelixResourceManager = 
_clusterInfoAccessor.getPinotHelixResourceManager();
+      BiMap<String, String> serverToEndpoints;
+      try {
+        serverToEndpoints = 
pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+      } catch (InvalidConfigException e) {
+        throw new RuntimeException(e);
+      }
+
+      Map<String, SegmentZKMetadata> completedSegmentsMap =
+          
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
 Function.identity()));
+
+      List<String> validDocIdUrls;
+      try {
+        validDocIdUrls = getValidDocIdMetadataUrls(serverToSegments, 
serverToEndpoints, tableNameWithType,
+            completedSegmentsMap.keySet());
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e);
+      }
+
+      // request the urls from the servers
+      CompletionServiceHelper completionServiceHelper =
+          new CompletionServiceHelper(_clusterInfoAccessor.getExecutor(), 
_clusterInfoAccessor.getConnectionManager(),
+              serverToEndpoints.inverse());
+
+      CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+          completionServiceHelper.doMultiGetRequest(validDocIdUrls, 
tableNameWithType, false, 3000);
+
+      SegmentSelectionResult segmentSelectionResult =
+          processValidDocIdMetadata(compactionConfigs, completedSegmentsMap, 
serviceResponse._httpResponses.entrySet());
+
+      if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
+        pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentSelectionResult.getSegmentsForDeletion(),
+            "0d");
+        LOGGER.info("Deleted segments containing only invalid records for 
table: {} for task: {}", tableNameWithType,
+            taskType);
+      }
+
+      int numTasks = 0;
+      int maxTasks = getMaxTasks(taskType, tableNameWithType, taskConfigs);
+      for (SegmentZKMetadata segment : 
segmentSelectionResult.getSegmentsForCompaction()) {
+        if (numTasks == maxTasks) {
+          break;
+        }
+        Map<String, String> configs = new HashMap<>();
+        configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType);
+        configs.put(MinionConstants.SEGMENT_NAME_KEY, 
segment.getSegmentName());
+        configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
segment.getDownloadUrl());
+        configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
+        configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, 
String.valueOf(segment.getCrc()));
+        pinotTaskConfigs.add(new 
PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs));
+        numTasks++;
+      }
+      LOGGER.info("Finished generating {} tasks configs for table: {} for 
task: {}", numTasks, tableNameWithType,
+          taskType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  @VisibleForTesting
+  public static SegmentSelectionResult processValidDocIdMetadata(Map<String, 
String> compactionConfigs,
+      Map<String, SegmentZKMetadata> completedSegmentsMap, 
Set<Map.Entry<String, String>> responseSet) {
+    double invalidRecordsThresholdPercent = Double.parseDouble(
+        
compactionConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
+            String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
+    long invalidRecordsThresholdCount = Long.parseLong(
+        
compactionConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
+            String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
+    List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>();
+    List<String> segmentsForDeletion = new ArrayList<>();
+    for (Map.Entry<String, String> streamResponse : responseSet) {
+      JsonNode allValidDocIdMetadata;
+      try {
+        allValidDocIdMetadata = 
JsonUtils.stringToJsonNode(streamResponse.getValue());
+      } catch (IOException e) {
+        LOGGER.error("Unable to parse validDocIdMetadata response for: {}", 
streamResponse.getKey());
+        continue;
+      }
+      Iterator<JsonNode> iterator = allValidDocIdMetadata.elements();
+      while (iterator.hasNext()) {
+        JsonNode validDocIdMetadata = iterator.next();
+        long invalidRecordCount = 
validDocIdMetadata.get("totalInvalidDocs").asLong();
+        String segmentName = validDocIdMetadata.get("segmentName").asText();
+        SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+        double invalidRecordPercent = ((double) invalidRecordCount / 
segment.getTotalDocs()) * 100;
+        if (invalidRecordCount == segment.getTotalDocs()) {
+          segmentsForDeletion.add(segment.getSegmentName());
+        } else if (invalidRecordPercent > invalidRecordsThresholdPercent
+            && invalidRecordCount > invalidRecordsThresholdCount) {
+          segmentsForCompaction.add(segment);
+        }
+      }
+    }
+    return new SegmentSelectionResult(segmentsForCompaction, 
segmentsForDeletion);
+  }
+
+  @VisibleForTesting
+  public static List<String> getValidDocIdMetadataUrls(Map<String, 
List<String>> serverToSegments,
+      BiMap<String, String> serverToEndpoints, String tableNameWithType, 
Set<String> completedSegments)
+      throws URISyntaxException {
+    Set<String> remainingSegments = new HashSet<>(completedSegments);
+    List<String> urls = new ArrayList<>();
+    for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
+      if (remainingSegments.isEmpty()) {
+        break;
+      }
+      String server = entry.getKey();
+      List<String> segmentNames = entry.getValue();
+      URIBuilder uriBuilder = new 
URIBuilder(serverToEndpoints.get(server)).setPath(
+          String.format("/tables/%s/validDocIdMetadata", tableNameWithType));
+      int completedSegmentCountPerServer = 0;
+      for (String segmentName : segmentNames) {
+        if (remainingSegments.contains(segmentName)) {
+          completedSegmentCountPerServer++;
+          uriBuilder.addParameter("segmentNames", segmentName);
+          remainingSegments.remove(segmentName);
+        }
+      }
+      if (completedSegmentCountPerServer > 0) {
+        // only add to the list if the server has completed segments
+        urls.add(uriBuilder.toString());
+      }
+    }
+    return urls;
+  }
+
+  private List<SegmentZKMetadata> getCompletedSegments(String 
tableNameWithType,
+      Map<String, String> compactionConfigs) {
+    List<SegmentZKMetadata> completedSegments = new ArrayList<>();
+    String bufferPeriod =
+        
compactionConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, 
DEFAULT_BUFFER_PERIOD);
+    long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+    List<SegmentZKMetadata> allSegments = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+    for (SegmentZKMetadata segment : allSegments) {
+      CommonConstants.Segment.Realtime.Status status = segment.getStatus();
+      // initial segments selection based on status and age
+      if (status.isCompleted() && (segment.getEndTimeMs() <= 
(System.currentTimeMillis() - bufferMs))) {
+        completedSegments.add(segment);
+      }
+    }
+    return completedSegments;
+  }
+
+  @VisibleForTesting
+  public static int getMaxTasks(String taskType, String tableNameWithType, 
Map<String, String> taskConfigs) {
+    int maxTasks = Integer.MAX_VALUE;
+    String tableMaxNumTasksConfig = 
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+    if (tableMaxNumTasksConfig != null) {
+      try {
+        maxTasks = Integer.parseInt(tableMaxNumTasksConfig);
+      } catch (Exception e) {
+        LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and 
task {}", tableNameWithType, taskType);
+      }
+    }
+    return maxTasks;
+  }
+
+  private static final String[] VALID_CONFIG_KEYS = {
+      UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, 
UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
+      UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT
+  };
+
+  private Map<String, String> getCompactionConfigs(Map<String, String> 
taskConfig) {

Review Comment:
   (minor) There is no need to extract this map. We can directly read the 
`taskConfig`



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java:
##########
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class);
+  private static final String DEFAULT_BUFFER_PERIOD = "7d";
+  private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0;
+  private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 0;
+
+  public static class SegmentSelectionResult {
+
+    private List<SegmentZKMetadata> _segmentsForCompaction;
+
+    private List<String> _segmentsForDeletion;
+
+    SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, 
List<String> segmentsForDeletion) {
+      _segmentsForCompaction = segmentsForCompaction;
+      _segmentsForDeletion = segmentsForDeletion;
+    }
+
+    public List<SegmentZKMetadata> getSegmentsForCompaction() {
+      return _segmentsForCompaction;
+    }
+
+    public List<String> getSegmentsForDeletion() {
+      return _segmentsForDeletion;
+    }
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.UpsertCompactionTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+    for (TableConfig tableConfig : tableConfigs) {
+      if (!validate(tableConfig)) {
+        continue;
+      }
+
+      String tableNameWithType = tableConfig.getTableName();
+      LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableNameWithType, taskType);
+
+      Map<String, String> taskConfigs = 
tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
+      Map<String, String> compactionConfigs = 
getCompactionConfigs(taskConfigs);
+      List<SegmentZKMetadata> completedSegments = 
getCompletedSegments(tableNameWithType, compactionConfigs);
+
+      if (completedSegments.isEmpty()) {
+        LOGGER.info("No completed segments were eligible for compaction for 
table: {}", tableNameWithType);
+        continue;
+      }
+
+      // get server to segment mappings
+      Map<String, List<String>> serverToSegments = 
_clusterInfoAccessor.getServerToSegmentsMap(tableNameWithType);
+      PinotHelixResourceManager pinotHelixResourceManager = 
_clusterInfoAccessor.getPinotHelixResourceManager();
+      BiMap<String, String> serverToEndpoints;
+      try {
+        serverToEndpoints = 
pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+      } catch (InvalidConfigException e) {
+        throw new RuntimeException(e);
+      }
+
+      Map<String, SegmentZKMetadata> completedSegmentsMap =
+          
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
 Function.identity()));
+
+      List<String> validDocIdUrls;
+      try {
+        validDocIdUrls = getValidDocIdMetadataUrls(serverToSegments, 
serverToEndpoints, tableNameWithType,
+            completedSegmentsMap.keySet());
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e);
+      }
+
+      // request the urls from the servers
+      CompletionServiceHelper completionServiceHelper =
+          new CompletionServiceHelper(_clusterInfoAccessor.getExecutor(), 
_clusterInfoAccessor.getConnectionManager(),
+              serverToEndpoints.inverse());
+
+      CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+          completionServiceHelper.doMultiGetRequest(validDocIdUrls, 
tableNameWithType, false, 3000);
+
+      SegmentSelectionResult segmentSelectionResult =
+          processValidDocIdMetadata(compactionConfigs, completedSegmentsMap, 
serviceResponse._httpResponses.entrySet());
+
+      if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
+        pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentSelectionResult.getSegmentsForDeletion(),
+            "0d");
+        LOGGER.info("Deleted segments containing only invalid records for 
table: {} for task: {}", tableNameWithType,
+            taskType);
+      }
+
+      int numTasks = 0;
+      int maxTasks = getMaxTasks(taskType, tableNameWithType, taskConfigs);
+      for (SegmentZKMetadata segment : 
segmentSelectionResult.getSegmentsForCompaction()) {
+        if (numTasks == maxTasks) {
+          break;
+        }
+        Map<String, String> configs = new HashMap<>();
+        configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType);
+        configs.put(MinionConstants.SEGMENT_NAME_KEY, 
segment.getSegmentName());
+        configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
segment.getDownloadUrl());
+        configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
+        configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, 
String.valueOf(segment.getCrc()));
+        pinotTaskConfigs.add(new 
PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs));
+        numTasks++;
+      }
+      LOGGER.info("Finished generating {} tasks configs for table: {} for 
task: {}", numTasks, tableNameWithType,
+          taskType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  @VisibleForTesting
+  public static SegmentSelectionResult processValidDocIdMetadata(Map<String, 
String> compactionConfigs,
+      Map<String, SegmentZKMetadata> completedSegmentsMap, 
Set<Map.Entry<String, String>> responseSet) {
+    double invalidRecordsThresholdPercent = Double.parseDouble(
+        
compactionConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
+            String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
+    long invalidRecordsThresholdCount = Long.parseLong(
+        
compactionConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
+            String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
+    List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>();
+    List<String> segmentsForDeletion = new ArrayList<>();
+    for (Map.Entry<String, String> streamResponse : responseSet) {
+      JsonNode allValidDocIdMetadata;
+      try {
+        allValidDocIdMetadata = 
JsonUtils.stringToJsonNode(streamResponse.getValue());
+      } catch (IOException e) {
+        LOGGER.error("Unable to parse validDocIdMetadata response for: {}", 
streamResponse.getKey());
+        continue;
+      }
+      Iterator<JsonNode> iterator = allValidDocIdMetadata.elements();
+      while (iterator.hasNext()) {
+        JsonNode validDocIdMetadata = iterator.next();
+        long invalidRecordCount = 
validDocIdMetadata.get("totalInvalidDocs").asLong();
+        String segmentName = validDocIdMetadata.get("segmentName").asText();
+        SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);

Review Comment:
   Total docs should also be available from the `validDocIdMetadata`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java:
##########
@@ -94,6 +101,30 @@ public List<SegmentZKMetadata> getSegmentsZKMetadata(String 
tableNameWithType) {
     return 
ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(),
 tableNameWithType);
   }
 
+  /**
+   * Get all server to segment mappings for the given table.
+   *
+   * @param tableNameWithType Table name with type suffix
+   * @return Map where the key is the server and the value is a List of 
segments
+   */
+  public Map<String, List<String>> getServerToSegmentsMap(String 
tableNameWithType) {

Review Comment:
   (minor) No need to add this API since we already have a getter for 
`_pinotHelixResourceManager`



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -125,8 +125,8 @@ public class TablesResource {
   //swagger annotations
   @ApiOperation(value = "List tables", notes = "List all the tables on this 
server")
   @ApiResponses(value = {
-      @ApiResponse(code = 200, message = "Success", response = 
TablesList.class),
-      @ApiResponse(code = 500, message = "Server initialization error", 
response = ErrorInfo.class)
+      @ApiResponse(code = 200, message = "Success", response = 
TablesList.class), @ApiResponse(code = 500, message =

Review Comment:
   Although it is done by auto reformat, let's not reformat the `@ApiResponse` 
because it makes it less readable. Same for other APIs



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java:
##########
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class);
+  private static final String DEFAULT_BUFFER_PERIOD = "7d";
+  private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0;
+  private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 0;
+
+  public static class SegmentSelectionResult {
+
+    private List<SegmentZKMetadata> _segmentsForCompaction;
+
+    private List<String> _segmentsForDeletion;
+
+    SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, 
List<String> segmentsForDeletion) {
+      _segmentsForCompaction = segmentsForCompaction;
+      _segmentsForDeletion = segmentsForDeletion;
+    }
+
+    public List<SegmentZKMetadata> getSegmentsForCompaction() {
+      return _segmentsForCompaction;
+    }
+
+    public List<String> getSegmentsForDeletion() {
+      return _segmentsForDeletion;
+    }
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.UpsertCompactionTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+    for (TableConfig tableConfig : tableConfigs) {
+      if (!validate(tableConfig)) {
+        continue;
+      }
+
+      String tableNameWithType = tableConfig.getTableName();
+      LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableNameWithType, taskType);
+
+      Map<String, String> taskConfigs = 
tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
+      Map<String, String> compactionConfigs = 
getCompactionConfigs(taskConfigs);
+      List<SegmentZKMetadata> completedSegments = 
getCompletedSegments(tableNameWithType, compactionConfigs);
+
+      if (completedSegments.isEmpty()) {
+        LOGGER.info("No completed segments were eligible for compaction for 
table: {}", tableNameWithType);
+        continue;
+      }
+
+      // get server to segment mappings
+      Map<String, List<String>> serverToSegments = 
_clusterInfoAccessor.getServerToSegmentsMap(tableNameWithType);
+      PinotHelixResourceManager pinotHelixResourceManager = 
_clusterInfoAccessor.getPinotHelixResourceManager();
+      BiMap<String, String> serverToEndpoints;
+      try {
+        serverToEndpoints = 
pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+      } catch (InvalidConfigException e) {
+        throw new RuntimeException(e);
+      }
+
+      Map<String, SegmentZKMetadata> completedSegmentsMap =
+          
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
 Function.identity()));
+
+      List<String> validDocIdUrls;
+      try {
+        validDocIdUrls = getValidDocIdMetadataUrls(serverToSegments, 
serverToEndpoints, tableNameWithType,
+            completedSegmentsMap.keySet());
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e);
+      }
+
+      // request the urls from the servers
+      CompletionServiceHelper completionServiceHelper =
+          new CompletionServiceHelper(_clusterInfoAccessor.getExecutor(), 
_clusterInfoAccessor.getConnectionManager(),
+              serverToEndpoints.inverse());
+
+      CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+          completionServiceHelper.doMultiGetRequest(validDocIdUrls, 
tableNameWithType, false, 3000);
+
+      SegmentSelectionResult segmentSelectionResult =
+          processValidDocIdMetadata(compactionConfigs, completedSegmentsMap, 
serviceResponse._httpResponses.entrySet());
+
+      if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
+        pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentSelectionResult.getSegmentsForDeletion(),
+            "0d");
+        LOGGER.info("Deleted segments containing only invalid records for 
table: {} for task: {}", tableNameWithType,

Review Comment:
   Please log the segment names also. It is very important to know what 
segments are deleted. Task type is not needed because it is embedded in the 
logger name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to