Jackie-Jiang commented on code in PR #12886: URL: https://github.com/apache/pinot/pull/12886#discussion_r1577299251
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment immutableSegment) { _logger.info("Preloaded immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); return; } - // Replacing segment takes multiple steps, and particularly need to access the oldSegment. Replace segment may - // happen in two threads, i.e. the consuming thread that's committing the mutable segment and a HelixTaskExecutor - // thread that's bringing segment from ONLINE to CONSUMING when the server finds consuming thread can't commit - // the segment in time. The slower thread takes the reference of the oldSegment here, but it may get closed by - // the faster thread if not synchronized. In particular, the slower thread may iterate the primary keys in the - // oldSegment, causing seg fault. So we have to take a lock here. - // However, we can't just reuse the existing segmentLocks. Because many methods of partitionUpsertMetadataManager - // takes this lock internally, but after taking snapshot RW lock. If we take segmentLock here (before taking - // snapshot RW lock), we can get into deadlock with threads calling partitionUpsertMetadataManager's other - // methods, like removeSegment. - // Adding segment should be done by a single HelixTaskExecutor thread, but do it with lock here for simplicity - // otherwise, we'd need to double-check if oldSegmentManager is null. - Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType, segmentName); - segmentLock.lock(); - try { - SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName); - if (oldSegmentManager == null) { - // When adding a new segment, we should register it 'before' it is fully initialized by - // partitionUpsertMetadataManager. Because when processing docs in the new segment, the docs in the other - // segments may be invalidated, making the queries see less valid docs than expected. We should let query - // access the new segment asap even though its validDocId bitmap is still being filled by - // partitionUpsertMetadataManager. - registerSegment(segmentName, newSegmentManager); - partitionUpsertMetadataManager.addSegment(immutableSegment); - _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); - } else { - // When replacing a segment, we should register the new segment 'after' it is fully initialized by - // partitionUpsertMetadataManager to fill up its validDocId bitmap. Otherwise, the queries will lose the access - // to the valid docs in the old segment immediately, but the validDocId bitmap of the new segment is still - // being filled by partitionUpsertMetadataManager, making the queries see less valid docs than expected. - // When replacing a segment, the new and old segments are assumed to have same set of valid docs for data - // consistency, otherwise the new segment should be named differently to go through the addSegment flow above. - IndexSegment oldSegment = oldSegmentManager.getSegment(); - partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment); - registerSegment(segmentName, newSegmentManager); - _logger.info("Replaced {} segment: {} of upsert-enabled table: {}", - oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType); - releaseSegment(oldSegmentManager); - } - } finally { - segmentLock.unlock(); - } - } - - @Override - protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { - // Cannot download consuming segment - if (zkMetadata.getStatus() == Status.IN_PROGRESS) { - return false; - } - // TODO: may support download from peer servers as well. - return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl()); - } - - void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata segmentZKMetadata, - IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) { - String uri = segmentZKMetadata.getDownloadUrl(); - if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) { - try { - // TODO: cleanup and consolidate the segment loading logic a bit for OFFLINE and REALTIME tables. - // https://github.com/apache/pinot/issues/9752 - downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri); - } catch (Exception e) { - _logger.warn("Download segment {} from deepstore uri {} failed.", segmentName, uri, e); - // Download from deep store failed; try to download from peer if peer download is setup for the table. - if (_peerDownloadScheme != null) { - downloadSegmentFromPeer(segmentName, indexLoadingConfig); - } else { - throw e; - } - } + SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName); + if (oldSegmentManager == null) { + // When adding a new segment, we should register it 'before' it is fully initialized by + // partitionUpsertMetadataManager. Because when processing docs in the new segment, the docs in the other + // segments may be invalidated, making the queries see less valid docs than expected. We should let query + // access the new segment asap even though its validDocId bitmap is still being filled by + // partitionUpsertMetadataManager. + registerSegment(segmentName, newSegmentManager); + partitionUpsertMetadataManager.addSegment(immutableSegment); + _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); } else { - if (_peerDownloadScheme != null) { - downloadSegmentFromPeer(segmentName, indexLoadingConfig); - } else { - throw new RuntimeException("Peer segment download not enabled for segment " + segmentName); - } - } - } - - private void downloadSegmentFromDeepStore(String segmentName, IndexLoadingConfig indexLoadingConfig, String uri) { - // This could leave temporary directories in _indexDir if JVM shuts down before the temp directory is deleted. - // This is fine since the temporary directories are deleted when the table data manager calls init. - File tempRootDir = null; - try { - tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis()); - File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile); - _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, segmentTarFile, - segmentTarFile.length()); - untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir); - } catch (Exception e) { - _logger.warn("Failed to download segment {} from deep store: ", segmentName, e); - throw new RuntimeException(e); - } finally { - FileUtils.deleteQuietly(tempRootDir); + // When replacing a segment, we should register the new segment 'after' it is fully initialized by + // partitionUpsertMetadataManager to fill up its validDocId bitmap. Otherwise, the queries will lose the access + // to the valid docs in the old segment immediately, but the validDocId bitmap of the new segment is still + // being filled by partitionUpsertMetadataManager, making the queries see less valid docs than expected. + // When replacing a segment, the new and old segments are assumed to have same set of valid docs for data + // consistency, otherwise the new segment should be named differently to go through the addSegment flow above. + IndexSegment oldSegment = oldSegmentManager.getSegment(); + partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment); + registerSegment(segmentName, newSegmentManager); + _logger.info("Replaced {} segment: {} of upsert-enabled table: {}", + oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType); + releaseSegment(oldSegmentManager); } } /** - * Untars the new segment and replaces the existing segment. + * Replaces the CONSUMING segment with a downloaded sealed one. */ - private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File segmentTarFile, - File tempRootDir) - throws IOException { - File untarDir = new File(tempRootDir, segmentName); - File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile, untarDir).get(0); - _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile, untarDir); - File indexDir = new File(_indexDir, segmentName); - FileUtils.deleteQuietly(indexDir); - FileUtils.moveDirectory(untaredSegDir, indexDir); - _logger.info("Replacing LLC Segment {}", segmentName); - replaceLLSegment(segmentName, indexLoadingConfig); - } - - private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig indexLoadingConfig) { - File tempRootDir = null; - try { - tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis()); - File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - // Next download the segment from a randomly chosen server using configured download scheme (http or https). - SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName, () -> { - List<URI> peerServerURIs = - PeerServerSegmentFinder.getPeerServerURIs(_helixManager, _tableNameWithType, segmentName, - _peerDownloadScheme); - Collections.shuffle(peerServerURIs); - return peerServerURIs; - }, segmentTarFile); - _logger.info("Fetched segment {} successfully to {} of size {}", segmentName, segmentTarFile, - segmentTarFile.length()); - untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir); - } catch (Exception e) { - _logger.warn("Download and move segment {} from peer with scheme {} failed.", segmentName, _peerDownloadScheme, - e); - throw new RuntimeException(e); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } + public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata) + throws Exception { + String segmentName = zkMetadata.getSegmentName(); + _logger.info("Downloading and replacing CONSUMING segment: {} with sealed one", segmentName); + File indexDir = downloadSegment(zkMetadata); + // Get a new index loading config with latest table config and schema to load the segment + IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(null); Review Comment: Good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org