liuchang0520 commented on a change in pull request #6778:
URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r655905507



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre-fetch the LLC segments without deep store copy.
+  public void prefetchLLCSegmentsWithoutDeepStoreCopy(String 
tableNameWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType != TableType.REALTIME) {
+        return;
+      }
+
+      TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+        return;
+      }
+
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      if (!streamConfig.hasLowLevelConsumerType()) {
+        return;
+      }
+
+      long currentTimeMs = getCurrentTimeMs();
+      List<String> segmentNames = 
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+      for (String segmentName : segmentNames) {
+        try {
+          if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+            continue;
+          }
+
+          LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+          // Cache the committed LLC segments without segment store download 
url
+          if (segmentZKMetadata.getStatus() == Status.DONE &&
+              
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+          }
+        } catch (Exception e) {
+          _controllerMetrics.addValueToTableGauge(tableNameWithType,
+              
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+          LOGGER.error("Failed to fetch the LLC segment {} ZK metadata", 
segmentName);
+        }
+      }
+  }
+
+  /**
+   * Only validate recently created LLC segment for missing deep store 
download url.
+   * The time range check is based on segment name. This step helps to 
alleviate ZK access.
+   */
+  private boolean isLLCSegmentWithinValidationRange(String segmentName, long 
currentTimeMs) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    long creationTimeMs = llcSegmentName.getCreationTimeMs();
+    return currentTimeMs - creationTimeMs < 
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add segment store download uri in ZK.
+   * Since uploading to segment store involves expensive compression step 
(first tar up the segment and then upload), we don't want to retry the 
uploading.
+   * Segment without segment store copy can still be downloaded from peer 
servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}, because segment manager is stopping.", realtimeTableName);

Review comment:
       Make sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to