mcvsubbu commented on a change in pull request #6778:
URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r655517240



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
##########
@@ -142,6 +148,8 @@ private static long getRandomInitialDelayInSeconds() {
 
     private static final int 
DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
     private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS = 
60 * 60;
+
+    private static final int 
DEFAULT_VALIDATION_RANGE_IN_DAYS_TO_CHECK_MISSING_SEGMENT_STORE_COPY = 3;

Review comment:
       Please keep the units at the end of the member name (*_IN_DAYS).
   Also, try to shorten the name if possible

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
##########
@@ -67,6 +67,12 @@
   // Percentage of segments we failed to get size for
   
TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent",
 false),
 
+  // Number of errors during segment store upload retry of LLC segment
+  
NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY("LLCSegmentDeepStoreUploadRetryError",
 false),

Review comment:
       Why gauge? These two should be meters.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE &&
+              
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          _controllerMetrics.addValueToTableGauge(tableNameWithType,
+              
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+          LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", 
segmentName);
+        }
+      }
+  }
+
+  /**
+   * Only validate recently created LLC segment for missing deep store 
download url.
+   * The time range check is based on segment name. This step helps to 
alleviate ZK access.
+   */
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long 
currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < 
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add segment store download uri in ZK.
+   * Since uploading to segment store involves expensive compression step 
(first tar up the segment and then upload), we don't want to retry the 
uploading.
+   * Segment without segment store copy can still be downloaded from peer 
servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}, because segment manager is stopping.", realtimeTableName);

Review comment:
       If there is nothing to fix, then this is a wrong message. An operator 
reading this message will think that there is something to be fixed for the 
table. Move the logic to below line 1358 once you have ascertained that there 
is something to fix for the table.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE &&
+              
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          _controllerMetrics.addValueToTableGauge(tableNameWithType,
+              
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+          LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", 
segmentName);
+        }
+      }
+  }
+
+  /**
+   * Only validate recently created LLC segment for missing deep store 
download url.
+   * The time range check is based on segment name. This step helps to 
alleviate ZK access.
+   */
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long 
currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < 
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add segment store download uri in ZK.
+   * Since uploading to segment store involves expensive compression step 
(first tar up the segment and then upload), we don't want to retry the 
uploading.
+   * Segment without segment store copy can still be downloaded from peer 
servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = 
_llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or 
skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+    RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+        
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase()),
+        
Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+
+    // Iterate through LLC segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding segment store download 
url.
+    while (!segmentQueue.isEmpty()) {
+      String segmentName = segmentQueue.poll();
+      // Check if it's null in case of the while condition doesn't stand true 
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+      if (segmentName == null) {
+        break;
+      }
+
+      try {
+        Stat stat = new Stat();
+        LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // if the download url is already fixed, skip the fix for this segment.
+        if 
(!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+          LOGGER.info("Skipped fixing LLC segment {} whose deep store download 
url is already available", segmentName);
+          continue;
+        }
+        // skip the fix for the segment if it is already out of retention.
+        if (retentionStrategy.isPurgeable(realtimeTableName, 
segmentZKMetadata)) {

Review comment:
       What happens if the retention is a short time away, and the retention 
manager walks in and starts to remove the segment while we are trying to upload 
it?  I think we have a race condition here that we need to think about.

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -727,6 +727,23 @@ public SimpleHttpResponse uploadSegment(URI uri, String 
segmentName, InputStream
     return uploadSegment(uri, segmentName, inputStream, null, parameters, 
DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
+  /**
+   * Controller periodic task uses this endpoint to ask servers to upload 
committed llc segment to segment store if missing.
+   * @param uri The uri to ask servers to upload segment to segment store
+   * @return the uploaded segment download url from segment store
+   * @throws URISyntaxException
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public String uploadToSegmentStore(String uri)

Review comment:
       How was this resolved?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE &&
+              
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          _controllerMetrics.addValueToTableGauge(tableNameWithType,
+              
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+          LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", 
segmentName);
+        }
+      }
+  }
+
+  /**
+   * Only validate recently created LLC segment for missing deep store 
download url.
+   * The time range check is based on segment name. This step helps to 
alleviate ZK access.
+   */
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long 
currentTimeMs) {

Review comment:
       remove the method and fold the logic into the place where it is called.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +158,20 @@
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private FileUploadDownloadClient _fileUploadDownloadClient;
+  /**
+   * Map caching the LLC segment names without deep store download uri.
+   * Controller gets the LLC segment names from this map, and asks servers to 
upload the segments to segment store.

Review comment:
       ```suggestion
      * Controller gets the LLC segment names from this map, and asks servers 
to upload the segments to deep store.
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -122,6 +137,14 @@
    * The segment will be eligible for repairs by the validation manager, if 
the time  exceeds this value
    */
   private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 
5 MINUTES
+  /**
+   * Controller waits this amount of time before asking servers to upload LLC 
segments without deep store copy.
+   * The reason is after step 1 of segment completion is done (segment ZK 
metadata status changed to be DONE),
+   * servers may be still in the process of loading segments.

Review comment:
       ```suggestion
      * servers may be still in the process of transitioning segments from 
CONSUMING to ONLINE states
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +158,20 @@
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private FileUploadDownloadClient _fileUploadDownloadClient;
+  /**
+   * Map caching the LLC segment names without deep store download uri.
+   * Controller gets the LLC segment names from this map, and asks servers to 
upload the segments to segment store.
+   * This helps to alleviates excessive ZK access when fetching LLC segment 
list.
+   * Key: table name; Value: LLC segment names to be uploaded to segment store.

Review comment:
       ```suggestion
      * Key: table name; Value: LLC segment names to be uploaded to deep store.
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
##########
@@ -121,6 +121,12 @@
     public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
         "controller.segmentRelocator.initialDelayInSeconds";
 
+    // configs for uploading missing LLC segments copy to segment store
+    public static final String 
ENABLE_UPLOAD_MISSING_LLC_SEGMENT_TO_SEGMENT_STORE =

Review comment:
       nit: ENABLE_DEEP_STORE_LLC_SEGMENT_CHECK ? I don't know, I am just 
thinking of some shorter name here.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +158,20 @@
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private FileUploadDownloadClient _fileUploadDownloadClient;
+  /**
+   * Map caching the LLC segment names without deep store download uri.
+   * Controller gets the LLC segment names from this map, and asks servers to 
upload the segments to segment store.
+   * This helps to alleviates excessive ZK access when fetching LLC segment 
list.
+   * Key: table name; Value: LLC segment names to be uploaded to segment store.
+   */
+  private Map<String, Queue<String>> _llcSegmentMapForUpload;

Review comment:
       Why is the value a `Queue`? Is there a notion of ordering here?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -122,6 +137,14 @@
    * The segment will be eligible for repairs by the validation manager, if 
the time  exceeds this value
    */
   private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 
5 MINUTES
+  /**
+   * Controller waits this amount of time before asking servers to upload LLC 
segments without deep store copy.

Review comment:
       ```suggestion
      * Controller waits this amount of time before asking servers to upload 
LLC segments missing in deep store.
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE &&
+              
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          _controllerMetrics.addValueToTableGauge(tableNameWithType,
+              
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+          LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", 
segmentName);
+        }
+      }
+  }
+
+  /**
+   * Only validate recently created LLC segment for missing deep store 
download url.
+   * The time range check is based on segment name. This step helps to 
alleviate ZK access.
+   */
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long 
currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < 
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add segment store download uri in ZK.
+   * Since uploading to segment store involves expensive compression step 
(first tar up the segment and then upload), we don't want to retry the 
uploading.
+   * Segment without segment store copy can still be downloaded from peer 
servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = 
_llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or 
skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+    RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+        
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase()),
+        
Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+
+    // Iterate through LLC segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding segment store download 
url.
+    while (!segmentQueue.isEmpty()) {
+      String segmentName = segmentQueue.poll();
+      // Check if it's null in case of the while condition doesn't stand true 
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+      if (segmentName == null) {

Review comment:
       I don't understand this null check. We already checked for queue empty 
above. Unless there are other threads modifying the queue (in which case, we 
need some precise synchronization rather than an if check everywhere), why 
check for null again?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +158,20 @@
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private FileUploadDownloadClient _fileUploadDownloadClient;
+  /**
+   * Map caching the LLC segment names without deep store download uri.

Review comment:
       ```suggestion
      * Map caching the LLC segment names that are missing deep store download 
uri in segment metadata.
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE &&
+              
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          _controllerMetrics.addValueToTableGauge(tableNameWithType,
+              
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+          LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", 
segmentName);
+        }
+      }
+  }
+
+  /**
+   * Only validate recently created LLC segment for missing deep store 
download url.
+   * The time range check is based on segment name. This step helps to 
alleviate ZK access.
+   */
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long 
currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < 
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add segment store download uri in ZK.
+   * Since uploading to segment store involves expensive compression step 
(first tar up the segment and then upload), we don't want to retry the 
uploading.
+   * Segment without segment store copy can still be downloaded from peer 
servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = 
_llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or 
skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+    RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+        
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase()),
+        
Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+
+    // Iterate through LLC segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding segment store download 
url.
+    while (!segmentQueue.isEmpty()) {
+      String segmentName = segmentQueue.poll();
+      // Check if it's null in case of the while condition doesn't stand true 
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+      if (segmentName == null) {
+        break;
+      }
+
+      try {
+        Stat stat = new Stat();
+        LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // if the download url is already fixed, skip the fix for this segment.

Review comment:
       Can you elaborate on how this can happen?
   

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +158,20 @@
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private FileUploadDownloadClient _fileUploadDownloadClient;
+  /**
+   * Map caching the LLC segment names without deep store download uri.
+   * Controller gets the LLC segment names from this map, and asks servers to 
upload the segments to segment store.
+   * This helps to alleviates excessive ZK access when fetching LLC segment 
list.

Review comment:
       ```suggestion
      * A cache helps to alleviate excessive ZK access when fetching LLC 
segment list.
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
##########
@@ -67,6 +67,22 @@ public RealtimeSegmentValidationManager(ControllerConf 
config, PinotHelixResourc
     Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
   }
 
+  @Override
+  protected void setUpTask() {
+    // Prefetch the LLC segment without segment store copy from ZK, which 
helps to alleviate ZK access.

Review comment:
       Where is the TODO? Please add it here, otherwise someone going through 
the code will have the same question (worse, may make the same assumption in 
other places). Make it clear that this may not work all the time and it is a 
TODO to take action when leadership is established.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {

Review comment:
       This check does not belong here. It belongs in the place where you 
decide to ask the server to upload to deepstore. Please move it. Also, we don't 
need a method for this. it is 2 lines long. Fold it in.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to