Jackie-Jiang commented on code in PR #12886:
URL: https://github.com/apache/pinot/pull/12886#discussion_r1577299251


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment 
immutableSegment) {
       _logger.info("Preloaded immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
       return;
     }
-    // Replacing segment takes multiple steps, and particularly need to access 
the oldSegment. Replace segment may
-    // happen in two threads, i.e. the consuming thread that's committing the 
mutable segment and a HelixTaskExecutor
-    // thread that's bringing segment from ONLINE to CONSUMING when the server 
finds consuming thread can't commit
-    // the segment in time. The slower thread takes the reference of the 
oldSegment here, but it may get closed by
-    // the faster thread if not synchronized. In particular, the slower thread 
may iterate the primary keys in the
-    // oldSegment, causing seg fault. So we have to take a lock here.
-    // However, we can't just reuse the existing segmentLocks. Because many 
methods of partitionUpsertMetadataManager
-    // takes this lock internally, but after taking snapshot RW lock. If we 
take segmentLock here (before taking
-    // snapshot RW lock), we can get into deadlock with threads calling 
partitionUpsertMetadataManager's other
-    // methods, like removeSegment.
-    // Adding segment should be done by a single HelixTaskExecutor thread, but 
do it with lock here for simplicity
-    // otherwise, we'd need to double-check if oldSegmentManager is null.
-    Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType, 
segmentName);
-    segmentLock.lock();
-    try {
-      SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
-      if (oldSegmentManager == null) {
-        // When adding a new segment, we should register it 'before' it is 
fully initialized by
-        // partitionUpsertMetadataManager. Because when processing docs in the 
new segment, the docs in the other
-        // segments may be invalidated, making the queries see less valid docs 
than expected. We should let query
-        // access the new segment asap even though its validDocId bitmap is 
still being filled by
-        // partitionUpsertMetadataManager.
-        registerSegment(segmentName, newSegmentManager);
-        partitionUpsertMetadataManager.addSegment(immutableSegment);
-        _logger.info("Added new immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
-      } else {
-        // When replacing a segment, we should register the new segment 
'after' it is fully initialized by
-        // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
-        // to the valid docs in the old segment immediately, but the 
validDocId bitmap of the new segment is still
-        // being filled by partitionUpsertMetadataManager, making the queries 
see less valid docs than expected.
-        // When replacing a segment, the new and old segments are assumed to 
have same set of valid docs for data
-        // consistency, otherwise the new segment should be named differently 
to go through the addSegment flow above.
-        IndexSegment oldSegment = oldSegmentManager.getSegment();
-        partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
-        registerSegment(segmentName, newSegmentManager);
-        _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
-            oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, _tableNameWithType);
-        releaseSegment(oldSegmentManager);
-      }
-    } finally {
-      segmentLock.unlock();
-    }
-  }
-
-  @Override
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    // Cannot download consuming segment
-    if (zkMetadata.getStatus() == Status.IN_PROGRESS) {
-      return false;
-    }
-    // TODO: may support download from peer servers as well.
-    return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl());
-  }
-
-  void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata 
segmentZKMetadata,
-      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
-    String uri = segmentZKMetadata.getDownloadUrl();
-    if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) {
-      try {
-        // TODO: cleanup and consolidate the segment loading logic a bit for 
OFFLINE and REALTIME tables.
-        //       https://github.com/apache/pinot/issues/9752
-        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
-      } catch (Exception e) {
-        _logger.warn("Download segment {} from deepstore uri {} failed.", 
segmentName, uri, e);
-        // Download from deep store failed; try to download from peer if peer 
download is setup for the table.
-        if (_peerDownloadScheme != null) {
-          downloadSegmentFromPeer(segmentName, indexLoadingConfig);
-        } else {
-          throw e;
-        }
-      }
+    SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
+    if (oldSegmentManager == null) {
+      // When adding a new segment, we should register it 'before' it is fully 
initialized by
+      // partitionUpsertMetadataManager. Because when processing docs in the 
new segment, the docs in the other
+      // segments may be invalidated, making the queries see less valid docs 
than expected. We should let query
+      // access the new segment asap even though its validDocId bitmap is 
still being filled by
+      // partitionUpsertMetadataManager.
+      registerSegment(segmentName, newSegmentManager);
+      partitionUpsertMetadataManager.addSegment(immutableSegment);
+      _logger.info("Added new immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
     } else {
-      if (_peerDownloadScheme != null) {
-        downloadSegmentFromPeer(segmentName, indexLoadingConfig);
-      } else {
-        throw new RuntimeException("Peer segment download not enabled for 
segment " + segmentName);
-      }
-    }
-  }
-
-  private void downloadSegmentFromDeepStore(String segmentName, 
IndexLoadingConfig indexLoadingConfig, String uri) {
-    // This could leave temporary directories in _indexDir if JVM shuts down 
before the temp directory is deleted.
-    // This is fine since the temporary directories are deleted when the table 
data manager calls init.
-    File tempRootDir = null;
-    try {
-      tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + 
System.currentTimeMillis());
-      File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-      SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile);
-      _logger.info("Downloaded file from {} to {}; Length of downloaded file: 
{}", uri, segmentTarFile,
-          segmentTarFile.length());
-      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, 
tempRootDir);
-    } catch (Exception e) {
-      _logger.warn("Failed to download segment {} from deep store: ", 
segmentName, e);
-      throw new RuntimeException(e);
-    } finally {
-      FileUtils.deleteQuietly(tempRootDir);
+      // When replacing a segment, we should register the new segment 'after' 
it is fully initialized by
+      // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
+      // to the valid docs in the old segment immediately, but the validDocId 
bitmap of the new segment is still
+      // being filled by partitionUpsertMetadataManager, making the queries 
see less valid docs than expected.
+      // When replacing a segment, the new and old segments are assumed to 
have same set of valid docs for data
+      // consistency, otherwise the new segment should be named differently to 
go through the addSegment flow above.
+      IndexSegment oldSegment = oldSegmentManager.getSegment();
+      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
+      registerSegment(segmentName, newSegmentManager);
+      _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
+          oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, _tableNameWithType);
+      releaseSegment(oldSegmentManager);
     }
   }
 
   /**
-   * Untars the new segment and replaces the existing segment.
+   * Replaces the CONSUMING segment with a downloaded sealed one.
    */
-  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, File segmentTarFile,
-      File tempRootDir)
-      throws IOException {
-    File untarDir = new File(tempRootDir, segmentName);
-    File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile, 
untarDir).get(0);
-    _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile, 
untarDir);
-    File indexDir = new File(_indexDir, segmentName);
-    FileUtils.deleteQuietly(indexDir);
-    FileUtils.moveDirectory(untaredSegDir, indexDir);
-    _logger.info("Replacing LLC Segment {}", segmentName);
-    replaceLLSegment(segmentName, indexLoadingConfig);
-  }
-
-  private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig 
indexLoadingConfig) {
-    File tempRootDir = null;
-    try {
-      tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + 
System.currentTimeMillis());
-      File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-      // Next download the segment from a randomly chosen server using 
configured download scheme (http or https).
-      
SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName,
 () -> {
-        List<URI> peerServerURIs =
-            PeerServerSegmentFinder.getPeerServerURIs(_helixManager, 
_tableNameWithType, segmentName,
-                _peerDownloadScheme);
-        Collections.shuffle(peerServerURIs);
-        return peerServerURIs;
-      }, segmentTarFile);
-      _logger.info("Fetched segment {} successfully to {} of size {}", 
segmentName, segmentTarFile,
-          segmentTarFile.length());
-      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, 
tempRootDir);
-    } catch (Exception e) {
-      _logger.warn("Download and move segment {} from peer with scheme {} 
failed.", segmentName, _peerDownloadScheme,
-          e);
-      throw new RuntimeException(e);
-    } finally {
-      FileUtils.deleteQuietly(tempRootDir);
-    }
+  public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata)
+      throws Exception {
+    String segmentName = zkMetadata.getSegmentName();
+    _logger.info("Downloading and replacing CONSUMING segment: {} with sealed 
one", segmentName);
+    File indexDir = downloadSegment(zkMetadata);
+    // Get a new index loading config with latest table config and schema to 
load the segment
+    IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(null);

Review Comment:
   Good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to