Jackie-Jiang commented on a change in pull request #7200: URL: https://github.com/apache/pinot/pull/7200#discussion_r681355298
########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java ########## @@ -385,4 +385,18 @@ public static Long extractTimeValue(Comparable time) { } return null; } + + public static Comparable extractTimeValueIfPossible(Comparable time) { Review comment: We don't need this ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java ########## @@ -375,12 +383,17 @@ public Builder setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager return this; } + public Builder setUpsertComparisonColumn(String upsertComparisonColumn) { Review comment: (nit) Same ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java ########## @@ -239,9 +239,9 @@ public void removeSegment(IndexSegment segment) { public static final class RecordInfo { private final PrimaryKey _primaryKey; private final int _docId; - private final long _timestamp; + private final Comparable _timestamp; Review comment: Rename the variable ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java ########## @@ -507,9 +511,10 @@ private boolean isUpsertEnabled() { private GenericRow handleUpsert(GenericRow row, int docId) { PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); - Object timeValue = row.getValue(_timeColumnName); - Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable"); - long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue); + Object timeValue = row.getValue(_upsertComparisonColumn); Review comment: Same here ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -404,9 +407,9 @@ public boolean hasNext() { values[i] = value; } PrimaryKey primaryKey = new PrimaryKey(values); - Object timeValue = columnToReaderMap.get(_timeColumnName).getValue(_docId); + Object timeValue = columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId); Review comment: Rename the variables to reflect the change. Same for other places ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -404,9 +407,9 @@ public boolean hasNext() { values[i] = value; } PrimaryKey primaryKey = new PrimaryKey(values); - Object timeValue = columnToReaderMap.get(_timeColumnName).getValue(_docId); + Object timeValue = columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId); Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable"); - long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue); + Comparable timestamp = IngestionUtils.extractTimeValueIfPossible((Comparable) timeValue); Review comment: Directly put the value instead of calling `extractTimeValue()`. We can handle general comparable values ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java ########## @@ -233,6 +240,7 @@ public PartitionUpsertMetadataManager getPartitionUpsertMetadataManager() { private String _consumerDir; private UpsertConfig.Mode _upsertMode; private PartitionUpsertMetadataManager _partitionUpsertMetadataManager; + private String _upsertComparisonColumn; Review comment: (nit) Move it before `_partitionUpsertMetadataManager` ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java ########## @@ -68,7 +69,8 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, String consumerDir, - UpsertConfig.Mode upsertMode, PartitionUpsertMetadataManager partitionUpsertMetadataManager) { + UpsertConfig.Mode upsertMode, PartitionUpsertMetadataManager partitionUpsertMetadataManager, + String upsertComparisonColumn) { Review comment: (nit) keep the same order as the declaration ```suggestion UpsertConfig.Mode upsertMode, String upsertComparisonColumn, PartitionUpsertMetadataManager partitionUpsertMetadataManager) { ``` ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java ########## @@ -27,9 +27,9 @@ public class RecordLocation { private final IndexSegment _segment; private final int _docId; - private final long _timestamp; + private final Comparable _timestamp; Review comment: Rename the variable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org