chenboat commented on a change in pull request #6113: URL: https://github.com/apache/incubator-pinot/pull/6113#discussion_r500780538
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -266,14 +291,59 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading manager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName, _partitionIdToSemaphoreMap.get(streamPartitionId), - _serverMetrics); + _serverMetrics, _upsertMetadataTableManager); } _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName); _segmentDataManagerMap.put(segmentName, manager); _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); } } + private boolean isUpsertEnabled() { + return _upsertMode != null && _upsertMode == UpsertConfig.Mode.FULL || _upsertMode == UpsertConfig.Mode.PARTIAL; + } + + @Override + public void addSegment(ImmutableSegment immutableSegment) { + if(isUpsertEnabled()) { + handleUpsert(immutableSegment); + } + super.addSegment(immutableSegment); + } + + private void handleUpsert(ImmutableSegment immutableSegment) { + Preconditions.checkArgument(!_primaryKeyColumns.isEmpty(), "the primary key columns cannot be empty"); + Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>(); + for (String primaryKeyColumn : _primaryKeyColumns) { + columnToReaderMap.put(primaryKeyColumn, new PinotSegmentColumnReader(immutableSegment, primaryKeyColumn)); + } + columnToReaderMap.put(_timeColumnName, new PinotSegmentColumnReader(immutableSegment, _timeColumnName)); + int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs(); + + // upsert metadata of the current segment + Map<PrimaryKey, RecordLocation> primaryKeyIndex = new HashMap<>(); + ThreadSafeMutableRoaringBitmap validDocIndex = new ThreadSafeMutableRoaringBitmap(); + + String segmentName = immutableSegment.getSegmentName(); + int partitionId = new LLCSegmentName(immutableSegment.getSegmentName()).getPartitionId(); + for (int docId = 0; docId < numTotalDocs; docId++) { Review comment: It is pretty expensive to loop through all the records in a segment and apply the upsert handling per record. Need to think through and better do a benchmark on the performance of this addSegment() method. This method is used when a segment is becoming online from offline state — which means it is invoked when a segment is loaded when the server starts OR when a server download the segment from another server in LLC. So it means slowdown of server start and segment download operations. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org