yupeng9 commented on a change in pull request #6113: URL: https://github.com/apache/incubator-pinot/pull/6113#discussion_r503668532
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -132,6 +146,19 @@ protected void doInit() { String consumerDirPath = getConsumerDir(); File consumerDir = new File(consumerDirPath); + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); Review comment: hmm, I did not see how to access it but from `_propertyStore `. Can you be more specific? Also, I am not sure the upsert config can change dynamically: partial vs full seems a disruptive change. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -266,14 +292,52 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading manager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName, _partitionIdToSemaphoreMap.get(streamPartitionId), - _serverMetrics); + _serverMetrics, _tableUpsertMetadataManager); } _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName); _segmentDataManagerMap.put(segmentName, manager); _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); } } + private boolean isUpsertEnabled() { + return _upsertMode == UpsertConfig.Mode.FULL || _upsertMode == UpsertConfig.Mode.PARTIAL; + } + + @Override + public void addSegment(ImmutableSegment immutableSegment) { + if (isUpsertEnabled()) { + handleUpsert(immutableSegment); + } + super.addSegment(immutableSegment); + } + + private void handleUpsert(ImmutableSegment immutableSegment) { Review comment: Do you suggest creating a `RealTimeUpsertTableDataManager` in `TableDataManagerProvider.getTableDataManager`? It's an option to me, though I feel uspert shall be better built as a first-class citizen in realtime table. @Jackie-Jiang what do you think? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ########## @@ -453,6 +465,49 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) { return canTakeMore; } + private boolean isUpsertEnabled() { + return _upsertMode != null && _upsertMode != UpsertConfig.Mode.NONE; + } + + private void handleUpsert(GenericRow row, int docId) { + // below are upsert operations + PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); + Object timeValue = row.getValue(_timeColumnName); + Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable"); + long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue); + RecordLocation location = new RecordLocation(_segmentName, docId, timestamp); + // check local primary key index first + if (_primaryKeyIndex.containsKey(primaryKey)) { + RecordLocation prevLocation = _primaryKeyIndex.get(primaryKey); + if (location.getTimestamp() >= prevLocation.getTimestamp()) { + _primaryKeyIndex.put(primaryKey, location); + // update validDocIndex + _validDocIndex.remove(prevLocation.getDocId()); + _validDocIndex.checkAndAdd(location.getDocId()); + LOGGER.debug(String + .format("upsert: replace old doc id %d with %d for key: %s, hash: %d", prevLocation.getDocId(), + location.getDocId(), primaryKey, primaryKey.hashCode())); + } else { + LOGGER.debug( + String.format("upsert: ignore a late-arrived record: %s, hash: %d", primaryKey, primaryKey.hashCode())); + } + } else if (_partitionUpsertMetadataManager.containsKey(primaryKey)) { + RecordLocation prevLocation = _partitionUpsertMetadataManager.getRecordLocation(primaryKey); + if (location.getTimestamp() >= prevLocation.getTimestamp()) { + _partitionUpsertMetadataManager.removeRecordLocation(primaryKey); + _primaryKeyIndex.put(primaryKey, location); + + // update validDocIndex + _partitionUpsertMetadataManager.getValidDocIndex(prevLocation.getSegmentName()) + .remove(prevLocation.getDocId()); + _validDocIndex.checkAndAdd(location.getDocId()); + } + } else { + _primaryKeyIndex.put(primaryKey, location); + _validDocIndex.checkAndAdd(location.getDocId()); + } Review comment: okay, let me refactor this a bit. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java ########## @@ -1165,6 +1172,14 @@ public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, Set<String> textIndexColumns = indexLoadingConfig.getTextIndexColumns(); _textIndexColumns = new ArrayList<>(textIndexColumns); + PartitionUpsertMetadataManager partitionUpsertMetadataManager = null; + UpsertConfig.Mode upsertMode = _tableConfig.getUpsertMode(); + if (_upsertMetadataTableManager != null && upsertMode != UpsertConfig.Mode.NONE) { + int partitionId = new LLCSegmentName(_segmentNameStr).getPartitionId(); Review comment: Good point. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org