Jackie-Jiang commented on a change in pull request #7200: URL: https://github.com/apache/pinot/pull/7200#discussion_r682018281
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -404,10 +406,10 @@ public boolean hasNext() { values[i] = value; } PrimaryKey primaryKey = new PrimaryKey(values); - Object timeValue = columnToReaderMap.get(_timeColumnName).getValue(_docId); - Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable"); - long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue); - return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, _docId++, timestamp); + Object upsertComparisonValue = columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId); + Preconditions.checkArgument(upsertComparisonValue instanceof Comparable, "time column shall be comparable"); Review comment: ```suggestion Preconditions.checkState(upsertComparisonValue instanceof Comparable, "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); ``` ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java ########## @@ -27,12 +27,13 @@ public class RecordLocation { private final IndexSegment _segment; private final int _docId; - private final long _timestamp; + /** value used to denote the order */ + private final Comparable _comparisonValue; - public RecordLocation(IndexSegment indexSegment, int docId, long timestamp) { + public RecordLocation(IndexSegment indexSegment, int docId, Comparable timestamp) { Review comment: ```suggestion public RecordLocation(IndexSegment indexSegment, int docId, Comparable comparisonValue) { ``` ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java ########## @@ -180,7 +180,7 @@ public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo, Gene // Update the record location when the new timestamp is greater than or equal to the current timestamp. Update Review comment: (nit) update the comments. Same for other places ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -319,6 +319,7 @@ private static void validateTaskConfigs(TableConfig tableConfig) { * - the primary key exists on the schema * - strict replica-group is configured for routing type * - consumer type must be low-level + * - comparison column exists Review comment: (nit) indentation ########## File path: pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java ########## @@ -70,7 +71,8 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<Str .setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata()) .setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory) .setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode) - .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).build(); + .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager) + .setUpsertComparisonColumn(comparisonColumn).build(); Review comment: (nit) move this before the metadata manager ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java ########## @@ -507,11 +511,11 @@ private boolean isUpsertEnabled() { private GenericRow handleUpsert(GenericRow row, int docId) { PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); - Object timeValue = row.getValue(_timeColumnName); - Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable"); - long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue); - return _partitionUpsertMetadataManager - .updateRecord(this, new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, timestamp), row); + Object upsertComparisonValue = row.getValue(_upsertComparisonColumn); + Preconditions.checkArgument(upsertComparisonValue instanceof Comparable, Review comment: ```suggestion Preconditions.checkState(upsertComparisonValue instanceof Comparable, "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org