mcvsubbu commented on a change in pull request #6113:
URL: https://github.com/apache/incubator-pinot/pull/6113#discussion_r503515858



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -132,6 +146,19 @@ protected void doInit() {
     String consumerDirPath = getConsumerDir();
     File consumerDir = new File(consumerDirPath);
 
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);

Review comment:
       tableconfig is available at the time TableDataManager is created. Can 
you use that instead of fetching it again here?
   Also, TableDataManager is not recreaated if table config changes. How do you 
plan to address tha?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -1165,6 +1172,14 @@ public 
LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata,
     Set<String> textIndexColumns = indexLoadingConfig.getTextIndexColumns();
     _textIndexColumns = new ArrayList<>(textIndexColumns);
 
+    PartitionUpsertMetadataManager partitionUpsertMetadataManager = null;
+    UpsertConfig.Mode upsertMode = _tableConfig.getUpsertMode();
+    if (_upsertMetadataTableManager != null && upsertMode != 
UpsertConfig.Mode.NONE) {
+      int partitionId = new LLCSegmentName(_segmentNameStr).getPartitionId();

Review comment:
       Please ise `_streamPartitionId` member variable

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -453,6 +465,49 @@ public boolean index(GenericRow row, @Nullable RowMetadata 
rowMetadata) {
     return canTakeMore;
   }
 
+  private boolean isUpsertEnabled() {
+    return _upsertMode != null && _upsertMode != UpsertConfig.Mode.NONE;
+  }
+
+  private void handleUpsert(GenericRow row, int docId) {
+    // below are upsert operations
+    PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
+    Object timeValue = row.getValue(_timeColumnName);
+    Preconditions.checkArgument(timeValue instanceof Comparable, "time column 
shall be comparable");
+    long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
+    RecordLocation location = new RecordLocation(_segmentName, docId, 
timestamp);
+    // check local primary key index first
+    if (_primaryKeyIndex.containsKey(primaryKey)) {
+      RecordLocation prevLocation = _primaryKeyIndex.get(primaryKey);
+      if (location.getTimestamp() >= prevLocation.getTimestamp()) {
+        _primaryKeyIndex.put(primaryKey, location);
+        // update validDocIndex
+        _validDocIndex.remove(prevLocation.getDocId());
+        _validDocIndex.checkAndAdd(location.getDocId());
+        LOGGER.debug(String
+            .format("upsert: replace old doc id %d with %d for key: %s, hash: 
%d", prevLocation.getDocId(),
+                location.getDocId(), primaryKey, primaryKey.hashCode()));
+      } else {
+        LOGGER.debug(
+            String.format("upsert: ignore a late-arrived record: %s, hash: 
%d", primaryKey, primaryKey.hashCode()));
+      }
+    } else if (_partitionUpsertMetadataManager.containsKey(primaryKey)) {
+      RecordLocation prevLocation = 
_partitionUpsertMetadataManager.getRecordLocation(primaryKey);
+      if (location.getTimestamp() >= prevLocation.getTimestamp()) {
+        _partitionUpsertMetadataManager.removeRecordLocation(primaryKey);
+        _primaryKeyIndex.put(primaryKey, location);
+
+        // update validDocIndex
+        
_partitionUpsertMetadataManager.getValidDocIndex(prevLocation.getSegmentName())
+            .remove(prevLocation.getDocId());
+        _validDocIndex.checkAndAdd(location.getDocId());
+      }
+    } else {
+      _primaryKeyIndex.put(primaryKey, location);
+      _validDocIndex.checkAndAdd(location.getDocId());
+    }

Review comment:
       +1

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -266,14 +292,52 @@ public void addSegment(String segmentName, TableConfig 
tableConfig, IndexLoading
         manager =
             new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, 
tableConfig, this, _indexDir.getAbsolutePath(),
                 indexLoadingConfig, schema, llcSegmentName, 
_partitionIdToSemaphoreMap.get(streamPartitionId),
-                _serverMetrics);
+                _serverMetrics, _tableUpsertMetadataManager);
       }
       _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
       _segmentDataManagerMap.put(segmentName, manager);
       _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
     }
   }
 
+  private boolean isUpsertEnabled() {
+    return _upsertMode == UpsertConfig.Mode.FULL || _upsertMode == 
UpsertConfig.Mode.PARTIAL;
+  }
+
+  @Override
+  public void addSegment(ImmutableSegment immutableSegment) {
+    if (isUpsertEnabled()) {
+      handleUpsert(immutableSegment);
+    }
+    super.addSegment(immutableSegment);
+  }
+
+  private void handleUpsert(ImmutableSegment immutableSegment) {

Review comment:
       Can we move the upsert handling to a different class? Perhaps a subclass 
of RealtimeTableDataManager?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to