Jackie-Jiang commented on a change in pull request #7200:
URL: https://github.com/apache/pinot/pull/7200#discussion_r682018281



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -404,10 +406,10 @@ public boolean hasNext() {
               values[i] = value;
             }
             PrimaryKey primaryKey = new PrimaryKey(values);
-            Object timeValue = 
columnToReaderMap.get(_timeColumnName).getValue(_docId);
-            Preconditions.checkArgument(timeValue instanceof Comparable, "time 
column shall be comparable");
-            long timestamp = IngestionUtils.extractTimeValue((Comparable) 
timeValue);
-            return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, 
_docId++, timestamp);
+            Object upsertComparisonValue = 
columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId);
+            Preconditions.checkArgument(upsertComparisonValue instanceof 
Comparable, "time column shall be comparable");

Review comment:
       ```suggestion
               Preconditions.checkState(upsertComparisonValue instanceof 
Comparable, "Upsert comparison column: %s must be comparable", 
_upsertComparisonColumn);
   ```

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
##########
@@ -27,12 +27,13 @@
 public class RecordLocation {
   private final IndexSegment _segment;
   private final int _docId;
-  private final long _timestamp;
+  /** value used to denote the order */
+  private final Comparable _comparisonValue;
 
-  public RecordLocation(IndexSegment indexSegment, int docId, long timestamp) {
+  public RecordLocation(IndexSegment indexSegment, int docId, Comparable 
timestamp) {

Review comment:
       ```suggestion
     public RecordLocation(IndexSegment indexSegment, int docId, Comparable 
comparisonValue) {
   ```

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -180,7 +180,7 @@ public GenericRow updateRecord(IndexSegment segment, 
RecordInfo recordInfo, Gene
 
         // Update the record location when the new timestamp is greater than 
or equal to the current timestamp. Update

Review comment:
       (nit) update the comments. Same for other places

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -319,6 +319,7 @@ private static void validateTaskConfigs(TableConfig 
tableConfig) {
    *  - the primary key exists on the schema
    *  - strict replica-group is configured for routing type
    *  - consumer type must be low-level
+   *   - comparison column exists

Review comment:
       (nit) indentation

##########
File path: 
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
##########
@@ -70,7 +71,8 @@ public static MutableSegmentImpl 
createMutableSegmentImpl(Schema schema, Set<Str
             .setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
             .setMemoryManager(new 
DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
             
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
-            
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).build();
+            .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
+            .setUpsertComparisonColumn(comparisonColumn).build();

Review comment:
       (nit) move this before the metadata manager

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -507,11 +511,11 @@ private boolean isUpsertEnabled() {
 
   private GenericRow handleUpsert(GenericRow row, int docId) {
     PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
-    Object timeValue = row.getValue(_timeColumnName);
-    Preconditions.checkArgument(timeValue instanceof Comparable, "time column 
shall be comparable");
-    long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
-    return _partitionUpsertMetadataManager
-        .updateRecord(this, new 
PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, timestamp), row);
+    Object upsertComparisonValue = row.getValue(_upsertComparisonColumn);
+    Preconditions.checkArgument(upsertComparisonValue instanceof Comparable,

Review comment:
       ```suggestion
     Preconditions.checkState(upsertComparisonValue instanceof Comparable, 
"Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to