mcvsubbu commented on a change in pull request #6778:
URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r644165186



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +152,16 @@
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private FileUploadDownloadClient _fileUploadDownloadClient;
+  // Map caching the LLC segment names without deep store download uri. 
Controller gets the LLC segment names from this map, and asks servers to upload 
the segments to segment store. This helps to alleviates excessive ZK access 
when fetching LLC segment list.

Review comment:
       nit: line too long. Please fix all your comment lines that are 
extra-long. Maybe you need to update your ide settings?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE && 
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          LOGGER.error("failed to fetch the LLC segment {} ZK metadata", 
segmentName);
+        }
+      }
+  }
+
+  // Only validate recently created LLC segment for missing deep store 
download url. The time range check is based on segment name. This step helps to 
alleviate ZK access.
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long 
currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < 
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add segment store download uri in ZK. Since uploading to segment store 
involves expensive compression step (first tar up the segment and then upload), 
we don't want to retry the uploading. Segment without segment store copy can 
still be downloaded from peer servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = 
_llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}: all segments are available in segment store.", realtimeTableName);
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or 
skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+
+    // Iterate through LLC segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding segment store download 
url.
+    while (!segmentQueue.isEmpty()) {
+      String segmentName = segmentQueue.poll();
+      // Check if it's null in case of the while condition doesn't stand true 
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+      if (segmentName == null) {
+        break;
+      }
+
+      try {
+        Stat stat = new Stat();
+        LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // if the segment status is no longer DONE, or the download url is 
already fixed, skip the fix for this segment.
+        if (segmentZKMetadata.getStatus() != Status.DONE || 
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+          continue;
+        }
+        // delay the fix to next round if not enough time elapsed since 
segment metadata update
+        if (!isExceededMinTimeToFixSegmentStoreCopy(stat)) {
+          segmentsNotFixed.offer(segmentName);
+          continue;
+        }
+        LOGGER.info("Fixing LLC segment {} whose segment store copy is 
unavailable", segmentName);
+
+        // Find servers which have online replica
+        List<URI> peerSegmentURIs = PeerServerSegmentFinder
+            .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, 
_helixManager);
+        if (peerSegmentURIs.isEmpty()) {
+          throw new IllegalStateException(String.format("Failed to upload 
segment %s to segment store because no online replica is found", segmentName));
+        }
+
+        // Randomly ask one server to upload
+        URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
+        String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), 
"upload");
+        LOGGER.info("Ask server to upload LLC segment {} to segment store by 
this path: {}", segmentName, serverUploadRequestUrl);
+        String segmentDownloadUrl = 
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+        LOGGER.info("Updating segment {} download url in ZK to be {}", 
segmentName, segmentDownloadUrl);
+        // Update segment ZK metadata by adding the download URL
+        segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
+        persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, 
stat.getVersion());
+        LOGGER.info("Successfully uploaded LLC segment {} to segment store 
with download url: {}", segmentName, segmentDownloadUrl);
+      } catch (Exception e) {
+        segmentsNotFixed.offer(segmentName);
+        _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.NUMBER_LLC_SEGMENTS_DEEP_STORE_UPLOAD_FIX_ERROR, 1L);
+        LOGGER.error("Failed to upload segment {} to segment store", 
segmentName, e);

Review comment:
       If a segment was present in the queue, but the retention manager deleted 
it, we will be retrying that for a long time. We should make sure that 
retention manager also removes segments from queue as they are retained out.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -122,6 +134,11 @@
    * The segment will be eligible for repairs by the validation manager, if 
the time  exceeds this value
    */
   private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 
5 MINUTES
+  /**

Review comment:
       Nit: Can you match the comment style like in line 129 to 135? (asterisk 
on each new line). thanks

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -157,6 +181,21 @@ public 
PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
     }
     _tableConfigCache = new TableConfigCache(_propertyStore);
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
+    _isUploadingRealtimeMissingSegmentStoreCopyEnabled = 
controllerConf.isUploadingRealtimeMissingSegmentStoreCopyEnabled();
+    if (_isUploadingRealtimeMissingSegmentStoreCopyEnabled) {
+      _fileUploadDownloadClient = initFileUploadDownloadClient();
+      _llcSegmentMapForUpload = new ConcurrentHashMap<>();
+      _validationRangeForLLCSegmentsDeepStoreCopyMs = 
(long)controllerConf.getValidationRangeInDaysToCheckMissingSegmentStoreCopy() * 
24 * 3600 * 1000;

Review comment:
       Use `TimeUnit` class to convert to millis, thanks

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
##########
@@ -49,7 +49,8 @@
   NUMBER_TIMES_SCHEDULE_TASKS_CALLED("tasks", true),
   NUMBER_TASKS_SUBMITTED("tasks", false),
   NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
-  CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false);
+  CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
+  
NUMBER_LLC_SEGMENTS_DEEP_STORE_UPLOAD_FIX_ERROR("llcSegmentDeepStoreUploadFixError",
 true);

Review comment:
       Please make this a gauge. You can set the gauge to the number of 
segments in `_llcSegmentMapForUpload`.  It is best not to make it a per-table 
value, since all tables are likely to have these errors. 

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -122,6 +134,11 @@
    * The segment will be eligible for repairs by the validation manager, if 
the time  exceeds this value
    */
   private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; // 
5 MINUTES
+  /**
+   * Controller waits this amount of time before asking servers to upload LLC 
segments without deep store copy. The reason is after step 1 of segment 
completion is done (segment ZK metadata status changed to be DONE), servers are 
still in the process of loading segments. Only after that segments are in 
ONLINE status in external view for the controller to discover.

Review comment:
       ```suggestion
      * Controller waits this amount of time before asking servers to upload 
LLC segments without deep store copy. 
      * The reason is after step 1 of segment completion is done (segment ZK 
metadata status changed to be 
      * DONE), servers may be  still in the process of loading segments. Only 
after that segments are in ONLINE 
      * status in external view for the controller to discover.
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE && 
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          LOGGER.error("failed to fetch the LLC segment {} ZK metadata", 
segmentName);

Review comment:
       This could be bad, right? If we are not able to fetch segment metadata 
here, that segment will be left blank in deepstore for a long time...until next 
restart or mastership shift. I think this needs a metric bumped.
   ```suggestion
             LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", 
segmentName);
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {

Review comment:
       i would prefer a method name like `isOlderThan` or `isNewerThan`

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE && 
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          LOGGER.error("failed to fetch the LLC segment {} ZK metadata", 
segmentName);
+        }
+      }
+  }
+
+  // Only validate recently created LLC segment for missing deep store 
download url. The time range check is based on segment name. This step helps to 
alleviate ZK access.
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long 
currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < 
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add segment store download uri in ZK. Since uploading to segment store 
involves expensive compression step (first tar up the segment and then upload), 
we don't want to retry the uploading. Segment without segment store copy can 
still be downloaded from peer servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = 
_llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}: all segments are available in segment store.", realtimeTableName);

Review comment:
       In a cluster with 1000s of tables, we will keep seeing this message. 
Remove this log.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {

Review comment:
       Should check for leadership for table here. If not leader, then remove 
the segments from the queue

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE && 
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          LOGGER.error("failed to fetch the LLC segment {} ZK metadata", 
segmentName);
+        }
+      }
+  }
+
+  // Only validate recently created LLC segment for missing deep store 
download url. The time range check is based on segment name. This step helps to 
alleviate ZK access.
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long 
currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < 
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add segment store download uri in ZK. Since uploading to segment store 
involves expensive compression step (first tar up the segment and then upload), 
we don't want to retry the uploading. Segment without segment store copy can 
still be downloaded from peer servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = 
_llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}: all segments are available in segment store.", realtimeTableName);
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or 
skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+
+    // Iterate through LLC segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding segment store download 
url.
+    while (!segmentQueue.isEmpty()) {
+      String segmentName = segmentQueue.poll();
+      // Check if it's null in case of the while condition doesn't stand true 
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+      if (segmentName == null) {
+        break;
+      }
+
+      try {
+        Stat stat = new Stat();
+        LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // if the segment status is no longer DONE, or the download url is 
already fixed, skip the fix for this segment.
+        if (segmentZKMetadata.getStatus() != Status.DONE || 
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {

Review comment:
       A segment will not make it to the queue until the status is DONE. Why do 
we have this check?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE && 
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          LOGGER.error("failed to fetch the LLC segment {} ZK metadata", 
segmentName);
+        }
+      }
+  }
+
+  // Only validate recently created LLC segment for missing deep store 
download url. The time range check is based on segment name. This step helps to 
alleviate ZK access.
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long 
currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < 
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add segment store download uri in ZK. Since uploading to segment store 
involves expensive compression step (first tar up the segment and then upload), 
we don't want to retry the uploading. Segment without segment store copy can 
still be downloaded from peer servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = 
_llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}: all segments are available in segment store.", realtimeTableName);
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or 
skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+
+    // Iterate through LLC segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding segment store download 
url.
+    while (!segmentQueue.isEmpty()) {
+      String segmentName = segmentQueue.poll();
+      // Check if it's null in case of the while condition doesn't stand true 
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+      if (segmentName == null) {
+        break;
+      }
+
+      try {
+        Stat stat = new Stat();
+        LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // if the segment status is no longer DONE, or the download url is 
already fixed, skip the fix for this segment.
+        if (segmentZKMetadata.getStatus() != Status.DONE || 
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+          continue;

Review comment:
       Log a msg here and at every point below as needed.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
##########
@@ -67,6 +67,22 @@ public RealtimeSegmentValidationManager(ControllerConf 
config, PinotHelixResourc
     Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
   }
 
+  @Override
+  protected void setUpTask() {
+    // Prefetch the LLC segment without segment store copy from ZK, which 
helps to alleviate ZK access.

Review comment:
       Are you sure that the controller leadership for a table is established 
by this time? We should do this in a callback from LeadControllerManager. And 
yes, this needs to be coded up. There is no callback right now, afaik.
   
   @jackjlli  can help.

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -727,6 +727,23 @@ public SimpleHttpResponse uploadSegment(URI uri, String 
segmentName, InputStream
     return uploadSegment(uri, segmentName, inputStream, null, parameters, 
DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
+  /**
+   * Controller periodic task uses this endpoint to ask servers to upload 
committed llc segment to segment store if missing.
+   * @param uri The uri to ask servers to upload segment to segment store
+   * @return the uploaded segment download url from segment store
+   * @throws URISyntaxException
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public String uploadToSegmentStore(String uri)

Review comment:
       +1, I agree with Jackie. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to