chenboat commented on a change in pull request #6778:
URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r636667636



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1273,112 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  // Pre fetch the LLC segment without deep store copy.
+  public void prefetchLLCSegmentWithoutDeepStoreCopy(String tableNameWithType) 
{
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == TableType.REALTIME) {
+        TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+        if (tableConfig == null) {
+          LOGGER.warn("Failed to find table config for table: {}", 
tableNameWithType);
+          return;
+        }
+
+        PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+            IngestionConfigUtils.getStreamConfigMap(tableConfig));
+        if (!streamConfig.hasLowLevelConsumerType()) {
+          return;
+        }
+
+        List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = 
ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, 
tableNameWithType);
+        for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : 
segmentZKMetadataList) {
+          // Cache the committed llc segment without segment store copy
+          // TODO: only cache the recently created LLC segment for optimization
+          if (segmentZKMetadata.getStatus() == Status.DONE && 
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+            cacheLLCSegmentNameForUpload(tableNameWithType, 
segmentZKMetadata.getSegmentName());
+          }
+        }
+      }
+  }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, 
and add segment store download uri in ZK. Since uploading to segment store 
involves expensive compression step (first tar up the segment and then upload), 
we don't want to retry the uploading. Segment without segment store copy can 
still be downloaded from peer servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    if (_isStopping) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}, because segment manager is stopping.", realtimeTableName);
+      return;
+    }
+
+    Queue<String> segmentQueue = 
_llcSegmentMapForUpload.get(realtimeTableName);
+    if (segmentQueue == null || segmentQueue.isEmpty()) {
+      LOGGER.info("Skipped fixing segment store copy of LLC segments for table 
{}: all segments are available in segment store.", realtimeTableName);
+      return;
+    }
+
+    // Store the segments to be fixed again in the case of fix failure, or 
skip in this round
+    Queue<String> segmentsNotFixed = new LinkedList<>();
+
+    // Iterate through llc segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the llc segment ZK metadata by adding segment store download 
url.
+    while (!segmentQueue.isEmpty()) {
+      String segmentName = segmentQueue.poll();
+      // Check if it's null in case of the while condition doesn't stand true 
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+      if (segmentName == null) {
+        break;
+      }
+
+      try {
+        Stat stat = new Stat();
+        LLCRealtimeSegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // if the segment status is no longer DONE, or the download url is 
already fixed, skip the fix for this segment.
+        if (segmentZKMetadata.getStatus() != Status.DONE || 
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
+          continue;
+        }
+        // delay the fix to next round if not enough time elapsed since 
segment metadata update
+        if (!isExceededMinTimeToFixSegmentStoreCopy(stat)) {
+          segmentsNotFixed.offer(segmentName);
+          continue;
+        }
+        LOGGER.info("Fixing llc segment {} whose segment store copy is 
unavailable", segmentName);
+
+        // Find servers which have online replica
+        List<URI> peerSegmentURIs = PeerServerSegmentFinder
+            .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, 
_helixManager);
+        if (peerSegmentURIs.isEmpty()) {
+          throw new IllegalStateException(String.format("Failed to upload 
segment %s to segment store because no online replica is found", segmentName));

Review comment:
       Should we put the segment back to the notfixed queue and retry later? 
There could be multiple reasons there is no online replicas.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to