Jackie-Jiang commented on a change in pull request #7319:
URL: https://github.com/apache/pinot/pull/7319#discussion_r699715552



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +267,147 @@ public void addSegmentError(String segmentName, 
SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), 
Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
+      SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean 
forceDownload)

Review comment:
       annotate local metadata as nullable

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
##########
@@ -240,7 +239,7 @@ public void testReplace()
 
     runStorageServer(numQueryThreads, runTimeSec, tableDataManager);  // 
replaces segments while online
 
-//    System.out.println("Nops = " + _numQueries + ",nDrops=" + _nDestroys + 
",nCreates=" + _allSegments.size());
+    // System.out.println("Nops = " + _numQueries + ",nDrops=" + _nDestroys + 
",nCreates=" + _allSegments.size());

Review comment:
       Remove it

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -248,19 +249,27 @@ public boolean isPartialUpsertEnabled() {
     return _upsertMode == UpsertConfig.Mode.PARTIAL;
   }
 
+  /**
+   * Add or replace an immutable segment created by the REALTIME table.
+   * Skipping segment still in consuming state.
+   */
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
+      SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean 
forceDownload)
+      throws Exception {
+    throw new UnsupportedOperationException();

Review comment:
       Why do we throw exception here?

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
##########
@@ -146,4 +156,12 @@ void addSegment(String segmentName, TableConfig 
tableConfig, IndexLoadingConfig
    * @return List of {@link SegmentErrorInfo}
    */
   Map<String, SegmentErrorInfo> getSegmentErrors();
+
+  /**
+   * Get the dir for the given table segment, when TableDataManager object is 
not
+   * yet initialized for the given table.
+   */
+  static File getSegmentDataDir(String instanceDataDir, String 
tableNameWithType, String segmentName) {

Review comment:
       This is instance level method, and seems only used in 
`InstanceDataManager`. Can we inline it instead of putting it as an interface 
method?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -248,19 +249,27 @@ public boolean isPartialUpsertEnabled() {
     return _upsertMode == UpsertConfig.Mode.PARTIAL;
   }
 
+  /**
+   * Add or replace an immutable segment created by the REALTIME table.
+   * Skipping segment still in consuming state.
+   */
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
+      SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean 
forceDownload)

Review comment:
       Annotate as nullable

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
##########
@@ -86,11 +86,28 @@ void reloadSegment(String tableNameWithType, String 
segmentName)
       throws Exception;
 
   /**
-   * Reloads all segment in a table.
+   * Reloads all segments in a table.
    */
   void reloadAllSegments(String tableNameWithType)
       throws Exception;
 
+  /**
+   * Adds or replaces a segment in a table. Different from segment reloading,
+   * 1. this method doesn't assume the existence of TableDataManager object 
and it
+   * can actually initialize the TableDataManager for the segment;
+   * 2. this method can download a new segment to replace the local one before 
loading.
+   * Download is conducted when local segment's CRC is different from the one 
of the
+   * remote segment, but can also be forced to do regardless of CRC difference.
+   */
+  void addOrReplaceSegment(String tableNameWithType, String segmentName, 
boolean forceDownload)

Review comment:
       The `forceDownload` should not be part of this API. We can add separate 
APIs for `redownloadSegment` and `redownloadAllSegments`, or add the flag into 
the `reload` apis.

##########
File path: 
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
##########
@@ -311,6 +310,47 @@ private void reloadSegment(String tableNameWithType, 
SegmentMetadata segmentMeta
     }
   }
 
+  @Override
+  public void addOrReplaceSegment(String tableNameWithType, String 
segmentName, boolean forceDownload)
+      throws Exception {
+    LOGGER.info("Adding or replacing segment: {} for table: {} 
downloadIsForced: {}", segmentName, tableNameWithType,
+        forceDownload);
+
+    // Get updated table config and segment metadata from Zookeeper.
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+    Preconditions.checkNotNull(tableConfig);
+    SegmentZKMetadata zkMetadata =
+        ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentName);
+    Preconditions.checkNotNull(zkMetadata);
+
+    // This method might modify the file on disk. Use segment lock to prevent 
race condition
+    Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, 
segmentName);
+    try {
+      // Lock the segment to get its metadata, so that no other threads are 
modifying
+      // the disk files of this segment.
+      segmentLock.lock();
+      SegmentMetadata localMetadata = getSegmentMetadata(tableNameWithType, 
segmentName);
+
+      _tableDataManagerMap.computeIfAbsent(tableNameWithType, k -> 
createTableDataManager(k, tableConfig))
+          .addOrReplaceSegment(segmentName, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig),
+              localMetadata, zkMetadata, forceDownload);
+      LOGGER.info("Added or replaced segment: {} of table: {}", segmentName, 
tableNameWithType);
+    } finally {
+      segmentLock.unlock();
+    }
+  }
+
+  @Override
+  public void addOrReplaceAllSegments(String tableNameWithType, boolean 
forceDownload)
+      throws Exception {
+    LOGGER.info("Adding or replacing all segments in table: {} 
downloadIsForced: {}", tableNameWithType, forceDownload);
+    List<SegmentMetadata> segMds = getAllSegmentsMetadata(tableNameWithType);
+    for (SegmentMetadata segMd : segMds) {
+      addOrReplaceSegment(tableNameWithType, segMd.getName(), forceDownload);

Review comment:
       Add a helper method with `TableConfig` and `SegmentZKMetadata` as the 
arguments, and make `addOrReplaceSegment()` and `addOrReplaceAllSegments()` 
calling the helper method. We should avoid reading them for each segment for 
bulk replace

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +267,147 @@ public void addSegmentError(String segmentName, 
SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), 
Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
+      SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean 
forceDownload)
+      throws Exception {
+    if (!forceDownload && !isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // If not forced to download, then try to recover if no local metadata is 
provided.
+    if (!forceDownload && localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", 
segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, 
loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+          return;
+        }
+        localMetadata = null;
+      }
+    }
+
+    // Download segment and replace the local one, either due to being forced 
to download, or the
+    // local segment is not able to get loaded, or the segment data is updated 
and has new CRC now.
+    if (forceDownload) {
+      LOGGER.info("Force to download segment: {} of table: {}", segmentName, 
_tableNameWithType);
+    } else if (localMetadata == null) {
+      LOGGER.info("Download segment: {} of table: {} as no one exists 
locally", segmentName, _tableNameWithType);
+    } else {
+      LOGGER.info("Download segment: {} of table: {} as local crc: {} 
mismatches remote crc: {}.", segmentName,
+          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    }
+    File indexDir = downloadSegmentFromDeepStore(segmentName, zkMetadata);
+    SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
+    addSegment(indexDir, indexLoadingConfig);
+    LOGGER.info("Downloaded and replaced segment: {} of table: {} with crc: 
{}", segmentName, _tableNameWithType,
+        segmentMetadata.getCrc());
+  }
+
+  /**
+   * Server restart during segment reload might leave segment directory in 
inconsistent state, like the index
+   * directory might not exist but segment backup directory existed. This 
method tries to recover from reload
+   * failure before checking the existence of the index directory and loading 
segment metadata from it.
+   */
+  private SegmentMetadata recoverSegmentQuietly(String segmentName) {

Review comment:
       (nit) I don't think we need to name it as `quietly`, same for 
`loadSegmentQuietly`

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
##########
@@ -78,6 +80,14 @@ void addSegment(File indexDir, IndexLoadingConfig 
indexLoadingConfig)
   void addSegment(String segmentName, TableConfig tableConfig, 
IndexLoadingConfig indexLoadingConfig)
       throws Exception;
 
+  /**
+   * Add or replace an immutable segment for current table, which can be either
+   * OFFLINE or REALTIME table.
+   */
+  void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
+      @Nullable SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, 
boolean forceDownload)

Review comment:
       (nit) for slightly better readability:
   ```suggestion
         SegmentZKMetadata remoteZKMetadata, @Nullable SegmentMetadata 
localMetadata, boolean forceDownload)
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +267,147 @@ public void addSegmentError(String segmentName, 
SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), 
Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
+      SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean 
forceDownload)
+      throws Exception {
+    if (!forceDownload && !isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // If not forced to download, then try to recover if no local metadata is 
provided.
+    if (!forceDownload && localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", 
segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, 
loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {

Review comment:
       What does quietly stand for?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -256,4 +267,147 @@ public void addSegmentError(String segmentName, 
SegmentErrorInfo segmentErrorInf
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), 
Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
+      SegmentMetadata localMetadata, SegmentZKMetadata zkMetadata, boolean 
forceDownload)
+      throws Exception {
+    if (!forceDownload && !isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;

Review comment:
       This is not correct. The segment is on local disk, but we still need to 
load the segment into the memory.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to