Jackie-Jiang commented on a change in pull request #6113:
URL: https://github.com/apache/incubator-pinot/pull/6113#discussion_r502127911



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -224,6 +242,16 @@ public void addSegment(String segmentName, TableConfig 
tableConfig, IndexLoading
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
     Preconditions.checkNotNull(schema);
 
+    // TODO(upsert): better checking&hanlding of upsert mode/primary key change

Review comment:
       Move this part into the `init()`, where you can read the table config 
and schema from the property store passed in. It is weird to reset these 
variables when adding each segment

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
##########
@@ -58,7 +59,17 @@ public FilterPlanNode(IndexSegment indexSegment, 
QueryContext queryContext) {
   public BaseFilterOperator run() {
     FilterContext filter = _queryContext.getFilter();
     if (filter != null) {
-      return constructPhysicalOperator(filter, 
_queryContext.getDebugOptions());
+      BaseFilterOperator filterOperator = constructPhysicalOperator(filter, 
_queryContext.getDebugOptions());
+      if (_indexSegment.getValidDocIndex() != null) {
+        BaseFilterOperator validDocFilter =
+            new 
BitmapBasedFilterOperator(_indexSegment.getValidDocIndex().getValidDocBitmap(), 
false, _numDocs);
+        return 
FilterOperatorUtils.getAndFilterOperator(Lists.newArrayList(filterOperator, 
validDocFilter), _numDocs,

Review comment:
       For better performance
   ```suggestion
           return 
FilterOperatorUtils.getAndFilterOperator(Arrays.asList(filterOperator, 
validDocFilter), _numDocs,
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/TableUpsertMetadataManager.java
##########
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.upsert;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * The manager of the upsert metadata of a table.
+ */
+@ThreadSafe
+public class TableUpsertMetadataManager {
+  private final Map<Integer, PartitionUpsertMetadataManager> 
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
+
+  public TableUpsertMetadataManager() {
+  }
+
+  public synchronized PartitionUpsertMetadataManager 
getOrCreatePartitionManager(int partitionId) {
+    if(!_partitionMetadataManagerMap.containsKey(partitionId)) {
+      _partitionMetadataManagerMap.put(partitionId, new 
PartitionUpsertMetadataManager(partitionId));
+    }
+    return _partitionMetadataManagerMap.get(partitionId);
+  }
+
+  public boolean isEmpty() {

Review comment:
       (nit) I don't think this is useful?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/TableUpsertMetadataManager.java
##########
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.upsert;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * The manager of the upsert metadata of a table.
+ */
+@ThreadSafe
+public class TableUpsertMetadataManager {
+  private final Map<Integer, PartitionUpsertMetadataManager> 
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
+
+  public TableUpsertMetadataManager() {
+  }
+
+  public synchronized PartitionUpsertMetadataManager 
getOrCreatePartitionManager(int partitionId) {
+    if(!_partitionMetadataManagerMap.containsKey(partitionId)) {
+      _partitionMetadataManagerMap.put(partitionId, new 
PartitionUpsertMetadataManager(partitionId));
+    }
+    return _partitionMetadataManagerMap.get(partitionId);
+  }

Review comment:
       ```suggestion
     public PartitionUpsertMetadataManager getOrCreatePartitionManager(int 
partitionId) {
       return _partitionMetadataManagerMap.computeIfAbsent(partitionId, 
PartitionUpsertMetadataManager::new);
     }
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -453,6 +465,49 @@ public boolean index(GenericRow row, @Nullable RowMetadata 
rowMetadata) {
     return canTakeMore;
   }
 
+  private boolean isUpsertEnabled() {
+    return _upsertMode != null && _upsertMode != UpsertConfig.Mode.NONE;
+  }
+
+  private void handleUpsert(GenericRow row, int docId) {
+    // below are upsert operations
+    PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
+    Object timeValue = row.getValue(_timeColumnName);
+    Preconditions.checkArgument(timeValue instanceof Comparable, "time column 
shall be comparable");
+    long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
+    RecordLocation location = new RecordLocation(_segmentName, docId, 
timestamp);
+    // check local primary key index first
+    if (_primaryKeyIndex.containsKey(primaryKey)) {
+      RecordLocation prevLocation = _primaryKeyIndex.get(primaryKey);
+      if (location.getTimestamp() >= prevLocation.getTimestamp()) {
+        _primaryKeyIndex.put(primaryKey, location);
+        // update validDocIndex
+        _validDocIndex.remove(prevLocation.getDocId());
+        _validDocIndex.checkAndAdd(location.getDocId());
+        LOGGER.debug(String
+            .format("upsert: replace old doc id %d with %d for key: %s, hash: 
%d", prevLocation.getDocId(),
+                location.getDocId(), primaryKey, primaryKey.hashCode()));
+      } else {
+        LOGGER.debug(
+            String.format("upsert: ignore a late-arrived record: %s, hash: 
%d", primaryKey, primaryKey.hashCode()));
+      }
+    } else if (_partitionUpsertMetadataManager.containsKey(primaryKey)) {
+      RecordLocation prevLocation = 
_partitionUpsertMetadataManager.getRecordLocation(primaryKey);
+      if (location.getTimestamp() >= prevLocation.getTimestamp()) {
+        _partitionUpsertMetadataManager.removeRecordLocation(primaryKey);
+        _primaryKeyIndex.put(primaryKey, location);
+
+        // update validDocIndex
+        
_partitionUpsertMetadataManager.getValidDocIndex(prevLocation.getSegmentName())
+            .remove(prevLocation.getDocId());
+        _validDocIndex.checkAndAdd(location.getDocId());
+      }
+    } else {
+      _primaryKeyIndex.put(primaryKey, location);
+      _validDocIndex.checkAndAdd(location.getDocId());
+    }

Review comment:
       Similarly, this part of the logic should be moved into the 
`PartitionUpsertMetadataManager` with concurrency control

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
##########
@@ -58,7 +59,17 @@ public FilterPlanNode(IndexSegment indexSegment, 
QueryContext queryContext) {
   public BaseFilterOperator run() {
     FilterContext filter = _queryContext.getFilter();
     if (filter != null) {
-      return constructPhysicalOperator(filter, 
_queryContext.getDebugOptions());
+      BaseFilterOperator filterOperator = constructPhysicalOperator(filter, 
_queryContext.getDebugOptions());
+      if (_indexSegment.getValidDocIndex() != null) {

Review comment:
       (nit) Cache `_indexSegment.getValidDocIndex()` into a local variable in 
line 61

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -266,14 +293,71 @@ public void addSegment(String segmentName, TableConfig 
tableConfig, IndexLoading
         manager =
             new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, 
tableConfig, this, _indexDir.getAbsolutePath(),
                 indexLoadingConfig, schema, llcSegmentName, 
_partitionIdToSemaphoreMap.get(streamPartitionId),
-                _serverMetrics);
+                _serverMetrics, _tableUpsertMetadataManager);
       }
       _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
       _segmentDataManagerMap.put(segmentName, manager);
       _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
     }
   }
 
+  private boolean isUpsertEnabled() {
+    return _upsertMode != null && (_upsertMode == UpsertConfig.Mode.FULL || 
_upsertMode == UpsertConfig.Mode.PARTIAL);

Review comment:
       ```suggestion
       return _upsertMode == UpsertConfig.Mode.FULL || _upsertMode == 
UpsertConfig.Mode.PARTIAL;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -266,14 +293,71 @@ public void addSegment(String segmentName, TableConfig 
tableConfig, IndexLoading
         manager =
             new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, 
tableConfig, this, _indexDir.getAbsolutePath(),
                 indexLoadingConfig, schema, llcSegmentName, 
_partitionIdToSemaphoreMap.get(streamPartitionId),
-                _serverMetrics);
+                _serverMetrics, _tableUpsertMetadataManager);
       }
       _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
       _segmentDataManagerMap.put(segmentName, manager);
       _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
     }
   }
 
+  private boolean isUpsertEnabled() {
+    return _upsertMode != null && (_upsertMode == UpsertConfig.Mode.FULL || 
_upsertMode == UpsertConfig.Mode.PARTIAL);
+  }
+
+  @Override
+  public void addSegment(ImmutableSegment immutableSegment) {
+    if (isUpsertEnabled()) {
+      handleUpsert(immutableSegment);
+    }
+    super.addSegment(immutableSegment);
+  }
+
+  private void handleUpsert(ImmutableSegment immutableSegment) {
+    Preconditions.checkArgument(!_primaryKeyColumns.isEmpty(), "the primary 
key columns cannot be empty");
+    Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>();
+    for (String primaryKeyColumn : _primaryKeyColumns) {
+      columnToReaderMap.put(primaryKeyColumn, new 
PinotSegmentColumnReader(immutableSegment, primaryKeyColumn));
+    }
+    columnToReaderMap.put(_timeColumnName, new 
PinotSegmentColumnReader(immutableSegment, _timeColumnName));
+    int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
+    String segmentName = immutableSegment.getSegmentName();
+    int partitionId = new 
LLCSegmentName(immutableSegment.getSegmentName()).getPartitionId();
+    PartitionUpsertMetadataManager partitionUpsertMetadataManager =
+        _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
+    for (int docId = 0; docId < numTotalDocs; docId++) {
+      Object[] fields = new Object[_primaryKeyColumns.size()];
+      for (int i = 0; i < _primaryKeyColumns.size(); i++) {
+        fields[i] = 
columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(docId);
+      }
+      PrimaryKey primaryKey = new PrimaryKey(fields);
+      Object timeValue = 
columnToReaderMap.get(_timeColumnName).getValue(docId);
+      Preconditions.checkArgument(timeValue instanceof Comparable, "time 
column shall be comparable");
+      long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
+      RecordLocation location = new RecordLocation(segmentName, docId, 
timestamp);
+      if (partitionUpsertMetadataManager.containsKey(primaryKey)) {
+        RecordLocation prevLocation = 
partitionUpsertMetadataManager.getRecordLocation(primaryKey);
+        // upsert
+        if (location.getTimestamp() >= prevLocation.getTimestamp()) {
+          partitionUpsertMetadataManager.removeRecordLocation(primaryKey);
+          partitionUpsertMetadataManager.putRecordLocation(primaryKey, 
location);
+          
partitionUpsertMetadataManager.getValidDocIndex(prevLocation.getSegmentName())
+              .remove(prevLocation.getDocId());
+          
partitionUpsertMetadataManager.getOrCreateValidDocIndex(segmentName).checkAndAdd(location.getDocId());
+          LOGGER.debug(String
+              .format("upsert: replace old doc id %d with %d for key: %s, 
hash: %d", prevLocation.getDocId(),
+                  location.getDocId(), primaryKey, primaryKey.hashCode()));
+        } else {
+          LOGGER.debug(
+              String.format("upsert: ignore a late-arrived record: %s, hash: 
%d", primaryKey, primaryKey.hashCode()));
+        }
+      } else { // append
+        partitionUpsertMetadataManager.putRecordLocation(primaryKey, location);
+        
partitionUpsertMetadataManager.getOrCreateValidDocIndex(segmentName).checkAndAdd(location.getDocId());
+      }

Review comment:
       Move this logic into the `PartitionUpsertMetadataManager` class and you 
need to add concurrency control for it. Currently this part of the code is not 
thread-safe if 2 segments are updating upsert metadata at the same time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to