Jackie-Jiang commented on a change in pull request #6778:
URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r617862992



##########
File path: 
pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
##########
@@ -76,7 +77,8 @@
   protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE =
       new 
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 2, 0, 
System.currentTimeMillis())
           .getSegmentName();
-  protected static final String SEGMENT_DOWNLOAD_URL = 
"testSegmentDownloadUrl";
+  protected static final String SEGMENT_DOWNLOAD_URL =

Review comment:
       Do we reconstruct the download URL from this string? Trying to 
understand why a dummy download url won't work

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +152,11 @@
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final CloseableHttpClient _httpClient;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private Random _rand = new Random();

Review comment:
       (nit) make it static
   ```suggestion
     private static final Random RANDOM = new Random();
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1239,86 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  /**
+   * Validate the committed low level consumer segments to see if its segment 
store copy is available. Fix the missing segment store copy by asking servers 
to upload to segment store.
+   * Since uploading to segment store involves expensive compression step 
(first tar up the segment and then upload), we don't want to retry the 
uploading. Segment without segment store copy can still be downloaded from peer 
servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+    String realtimeTableName = tableConfig.getTableName();
+    // Get all the LLC segment ZK metadata for this table
+    List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = 
ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, 
realtimeTableName);
+
+    // Iterate through llc segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the llc segment ZK metadata by adding segment store download 
url.
+    for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : 
segmentZKMetadataList) {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      // Only fix the committed llc segment without segment store copy
+      if (segmentZKMetadata.getStatus() == Status.DONE && 
(segmentZKMetadata.getDownloadUrl() == null || 
segmentZKMetadata.getDownloadUrl().isEmpty() || 
segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD)))
 {
+        try {
+          if (!isExceededMaxSegmentCompletionTime(realtimeTableName, 
segmentName, getCurrentTimeMs())) {
+            continue;
+          }
+          LOGGER.info("Fixing llc segment {} whose segment store copy is 
unavailable", segmentName);
+
+          // Find servers which have online replica
+          List<URI> peerSegmentURIs = PeerServerSegmentFinder
+              .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, 
_helixManager);
+          if (peerSegmentURIs.isEmpty()) {
+            LOGGER.error("Failed to upload segment {} to segment store because 
no online replica is found", segmentName);
+            continue;
+          }
+
+          // Randomly ask one server to upload
+          URI uri = peerSegmentURIs.get(_rand.nextInt(peerSegmentURIs.size()));
+          String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), 
"upload");
+          LOGGER.info("Ask server to upload llc segment to segment store by 
this path: {}", serverUploadRequestUrl);
+          String segmentDownloadUrl = 
uploadLLCSegmentByServer(serverUploadRequestUrl);
+
+          // Update the segment ZK metadata to include segment download url
+          if (segmentDownloadUrl == null || segmentDownloadUrl.isEmpty()) {
+            LOGGER.error("Failed to upload segment {} to segment store: no 
segment download url is returned from server.", segmentName);
+            continue;
+          }
+          segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
+          persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
+          LOGGER.info("Successfully uploaded llc segment {} to segment store", 
segmentName);
+        } catch (Exception e) {
+          LOGGER.error("Failed to upload segment {} to segment store", 
segmentName, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper function to ask server to upload llc segment to segment store
+   */
+  private String uploadLLCSegmentByServer(String serverUploadRequestUrl)
+      throws URISyntaxException, IOException, HttpErrorStatusException {
+    HttpUriRequest request = buildServerUploadRequest(serverUploadRequestUrl);
+    try (CloseableHttpResponse response = _httpClient.execute(request)) {
+      int statusCode = response.getStatusLine().getStatusCode();
+      String responseStr = response.getEntity() == null ? "" : 
EntityUtils.toString(response.getEntity());
+      if (statusCode >= 300) {
+        StringBuilder errorMsgBuilder = new 
StringBuilder(String.format("Failed to ask server to upload LLC segment to 
segment store by url: %s. ", serverUploadRequestUrl));
+        if (!responseStr.isEmpty()) {
+          errorMsgBuilder.append(responseStr);
+        }
+        throw new HttpErrorStatusException(errorMsgBuilder.toString(), 
statusCode);

Review comment:
       We might want to log the error instead of throwing exception because it 
will abort the following segment upload

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1239,86 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  /**
+   * Validate the committed low level consumer segments to see if its segment 
store copy is available. Fix the missing segment store copy by asking servers 
to upload to segment store.
+   * Since uploading to segment store involves expensive compression step 
(first tar up the segment and then upload), we don't want to retry the 
uploading. Segment without segment store copy can still be downloaded from peer 
servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+    String realtimeTableName = tableConfig.getTableName();
+    // Get all the LLC segment ZK metadata for this table
+    List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = 
ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, 
realtimeTableName);
+
+    // Iterate through llc segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the llc segment ZK metadata by adding segment store download 
url.
+    for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : 
segmentZKMetadataList) {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      // Only fix the committed llc segment without segment store copy
+      if (segmentZKMetadata.getStatus() == Status.DONE && 
(segmentZKMetadata.getDownloadUrl() == null || 
segmentZKMetadata.getDownloadUrl().isEmpty() || 
segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD)))
 {
+        try {
+          if (!isExceededMaxSegmentCompletionTime(realtimeTableName, 
segmentName, getCurrentTimeMs())) {
+            continue;
+          }
+          LOGGER.info("Fixing llc segment {} whose segment store copy is 
unavailable", segmentName);
+
+          // Find servers which have online replica
+          List<URI> peerSegmentURIs = PeerServerSegmentFinder
+              .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, 
_helixManager);
+          if (peerSegmentURIs.isEmpty()) {
+            LOGGER.error("Failed to upload segment {} to segment store because 
no online replica is found", segmentName);
+            continue;
+          }
+
+          // Randomly ask one server to upload
+          URI uri = peerSegmentURIs.get(_rand.nextInt(peerSegmentURIs.size()));
+          String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), 
"upload");
+          LOGGER.info("Ask server to upload llc segment to segment store by 
this path: {}", serverUploadRequestUrl);
+          String segmentDownloadUrl = 
uploadLLCSegmentByServer(serverUploadRequestUrl);
+
+          // Update the segment ZK metadata to include segment download url
+          if (segmentDownloadUrl == null || segmentDownloadUrl.isEmpty()) {
+            LOGGER.error("Failed to upload segment {} to segment store: no 
segment download url is returned from server.", segmentName);
+            continue;
+          }
+          segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
+          persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);

Review comment:
       Here we want to do version check in case the metadata is changed

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -91,6 +105,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.pinot.spi.utils.CommonConstants.Server.SegmentCompletionProtocol.DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS;

Review comment:
       (nit) avoid static import

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
##########
@@ -100,6 +100,7 @@ protected void processTable(String tableNameWithType, 
Context context) {
           IngestionConfigUtils.getStreamConfigMap(tableConfig));
       if (streamConfig.hasLowLevelConsumerType()) {
         _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig);
+        _llcRealtimeSegmentManager.uploadToSegmentStoreIfMissing(tableConfig);

Review comment:
       We might want to make it configurable in case users want to run pinot 
without deep storage

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1239,86 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  /**
+   * Validate the committed low level consumer segments to see if its segment 
store copy is available. Fix the missing segment store copy by asking servers 
to upload to segment store.
+   * Since uploading to segment store involves expensive compression step 
(first tar up the segment and then upload), we don't want to retry the 
uploading. Segment without segment store copy can still be downloaded from peer 
servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+    String realtimeTableName = tableConfig.getTableName();
+    // Get all the LLC segment ZK metadata for this table
+    List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = 
ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, 
realtimeTableName);
+
+    // Iterate through llc segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the llc segment ZK metadata by adding segment store download 
url.
+    for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : 
segmentZKMetadataList) {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      // Only fix the committed llc segment without segment store copy
+      if (segmentZKMetadata.getStatus() == Status.DONE && 
(segmentZKMetadata.getDownloadUrl() == null || 
segmentZKMetadata.getDownloadUrl().isEmpty() || 
segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD)))
 {
+        try {
+          if (!isExceededMaxSegmentCompletionTime(realtimeTableName, 
segmentName, getCurrentTimeMs())) {

Review comment:
       I don't think we need this check if the segment is already committed

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +152,11 @@
   private final Lock[] _idealStateUpdateLocks;
   private final TableConfigCache _tableConfigCache;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final CloseableHttpClient _httpClient;

Review comment:
       Close the client in `stop()`

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1239,86 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  /**
+   * Validate the committed low level consumer segments to see if its segment 
store copy is available. Fix the missing segment store copy by asking servers 
to upload to segment store.
+   * Since uploading to segment store involves expensive compression step 
(first tar up the segment and then upload), we don't want to retry the 
uploading. Segment without segment store copy can still be downloaded from peer 
servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+    String realtimeTableName = tableConfig.getTableName();
+    // Get all the LLC segment ZK metadata for this table
+    List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = 
ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, 
realtimeTableName);
+
+    // Iterate through llc segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the llc segment ZK metadata by adding segment store download 
url.
+    for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : 
segmentZKMetadataList) {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      // Only fix the committed llc segment without segment store copy
+      if (segmentZKMetadata.getStatus() == Status.DONE && 
(segmentZKMetadata.getDownloadUrl() == null || 
segmentZKMetadata.getDownloadUrl().isEmpty() || 
segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD)))
 {

Review comment:
       Can be simplified?
   ```suggestion
         if (segmentZKMetadata.getStatus() == Status.DONE && 
segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD))
 {
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1239,86 @@ private int 
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / 
numInstancesPerReplicaGroup;
     }
   }
+
+  /**
+   * Validate the committed low level consumer segments to see if its segment 
store copy is available. Fix the missing segment store copy by asking servers 
to upload to segment store.
+   * Since uploading to segment store involves expensive compression step 
(first tar up the segment and then upload), we don't want to retry the 
uploading. Segment without segment store copy can still be downloaded from peer 
servers.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+    String realtimeTableName = tableConfig.getTableName();
+    // Get all the LLC segment ZK metadata for this table
+    List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList = 
ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore, 
realtimeTableName);
+
+    // Iterate through llc segments and upload missing segment store copy by 
following steps:
+    //  1. Ask servers which have online segment replica to upload to segment 
store. Servers return segment store download url after successful uploading.
+    //  2. Update the llc segment ZK metadata by adding segment store download 
url.
+    for (LLCRealtimeSegmentZKMetadata segmentZKMetadata : 
segmentZKMetadataList) {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      // Only fix the committed llc segment without segment store copy
+      if (segmentZKMetadata.getStatus() == Status.DONE && 
(segmentZKMetadata.getDownloadUrl() == null || 
segmentZKMetadata.getDownloadUrl().isEmpty() || 
segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD)))
 {
+        try {
+          if (!isExceededMaxSegmentCompletionTime(realtimeTableName, 
segmentName, getCurrentTimeMs())) {
+            continue;
+          }
+          LOGGER.info("Fixing llc segment {} whose segment store copy is 
unavailable", segmentName);
+
+          // Find servers which have online replica
+          List<URI> peerSegmentURIs = PeerServerSegmentFinder
+              .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, 
_helixManager);
+          if (peerSegmentURIs.isEmpty()) {
+            LOGGER.error("Failed to upload segment {} to segment store because 
no online replica is found", segmentName);
+            continue;
+          }
+
+          // Randomly ask one server to upload
+          URI uri = peerSegmentURIs.get(_rand.nextInt(peerSegmentURIs.size()));
+          String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), 
"upload");
+          LOGGER.info("Ask server to upload llc segment to segment store by 
this path: {}", serverUploadRequestUrl);
+          String segmentDownloadUrl = 
uploadLLCSegmentByServer(serverUploadRequestUrl);
+
+          // Update the segment ZK metadata to include segment download url
+          if (segmentDownloadUrl == null || segmentDownloadUrl.isEmpty()) {

Review comment:
       (nit) Seems it won't be `null`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to