Copilot commented on code in PR #16361:
URL: https://github.com/apache/pinot/pull/16361#discussion_r2442599972


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -165,31 +176,89 @@ protected List<SegmentConversionResult> 
convert(PinotTaskConfig pinotTaskConfig,
 
     SegmentProcessorConfig segmentProcessorConfig = 
segmentProcessorConfigBuilder.build();
 
-    List<RecordReader> recordReaders = new ArrayList<>(numInputSegments);
+    // Check if this is an upsert table from the pre-computed config value
+    boolean isUpsertTable =
+        
Boolean.parseBoolean(configs.getOrDefault(RealtimeToOfflineSegmentsTask.IS_UPSERT_TABLE,
 "false"));
+
+    // Create record readers - use CompactedPinotSegmentRecordReader for 
upsert tables
+    List<RecordReaderFileConfig> recordReaderFileConfigs = new 
ArrayList<>(numInputSegments);
     int count = 1;
-    for (File segmentDir : segmentDirs) {
-      _eventObserver.notifyProgress(_pinotTaskConfig,
-          String.format("Creating RecordReader for: %s (%d out of %d)", 
segmentDir, count++, numInputSegments));
-      PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
-      // NOTE: Do not fill null field with default value to be consistent with 
other record readers
-      recordReader.init(segmentDir, null, null, true);
-      recordReaders.add(recordReader);
-    }
-    List<File> outputSegmentDirs;
-    try {
-      _eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments");
-      SegmentProcessorFramework framework =
-          new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, 
workingDir);
-      outputSegmentDirs = framework.process();
-      _eventObserver.notifyProgress(pinotTaskConfig,
-          "Segment processing stats - incomplete rows:" + 
framework.getIncompleteRowsFound() + ", dropped rows:"
-              + framework.getSkippedRowsFound() + ", sanitized rows:" + 
framework.getSanitizedRowsFound());
-    } finally {
-      for (RecordReader recordReader : recordReaders) {
-        recordReader.close();
+
+    if (isUpsertTable) {
+      LOGGER.info("Processing upsert table: {}, using 
CompactedPinotSegmentRecordReader for {} segments",
+          realtimeTableName, numInputSegments);
+      _eventObserver.notifyProgress(pinotTaskConfig, "Creating upsert-aware 
record readers");
+
+      // Get segment metadata for CRC validation
+      List<SegmentMetadataImpl> segmentMetadataList = 
segmentDirs.stream().map(x -> {
+        try {
+          return new SegmentMetadataImpl(x);
+        } catch (Exception e) {
+          throw new RuntimeException(String.format("Error fetching 
segment-metadata for segmentDir: %s", x), e);
+        }
+      }).collect(Collectors.toList());
+
+      // Validate if CRC of deepstore copies is same as that in ZK of segments
+      List<String> originalSegmentCrcFromTaskGenerator =
+          
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
+      validateCRCForInputSegments(segmentMetadataList, 
originalSegmentCrcFromTaskGenerator);
+
+      for (int i = 0; i < segmentDirs.size(); i++) {
+        File segmentDir = segmentDirs.get(i);
+        SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i);
+        _eventObserver.notifyProgress(_pinotTaskConfig,
+            String.format("Creating CompactedRecordReader for: %s (%d out of 
%d)", segmentDir, count++,
+                numInputSegments));
+
+        // Fetch validDocID snapshot from server and get record-reader for 
compacted reader (hardcode SNAPSHOT like
+        // UpsertCompactMergeTask)
+        RoaringBitmap validDocIds = 
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(
+            realtimeTableName, segmentMetadata.getName(), 
ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT,
+            segmentMetadata.getCrc());
+
+        if (validDocIds == null) {
+          // No valid crc match found or no validDocIds obtained from all 
servers
+          // Error out the task instead of silently failing so that we can 
track it via task-error metrics
+          String message = String.format("No validDocIds found from all 
servers. They either failed to download "
+                  + "or did not match crc from segment copy obtained from 
deepstore / servers. " + "Expected crc: %s",

Review Comment:
   The error message formatting has an issue. The string concatenation on line 
222-223 creates a malformed message with 'Expected crc: %s' but the format 
specifier is not used with String.format(). Either use String.format() properly 
or use string concatenation consistently throughout the message.
   ```suggestion
             String message = String.format(
                 "No validDocIds found from all servers. They either failed to 
download or did not match crc from segment copy obtained from deepstore / 
servers. Expected crc: %s",
   ```



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java:
##########
@@ -321,6 +410,99 @@ public void testRealtimeToOfflineSegmentsMetadataPushTask()
     }
   }
 
+  private long getMinUpsertTimestamp() {
+    ResultSetGroup resultSetGroup =
+        getPinotConnection().execute("SELECT MIN(" + UPSERT_TIME_COL_NAME + ") 
FROM " + UPSERT_TABLE_NAME
+            + " OPTION(skipUpsert=true)");
+    if (resultSetGroup.getResultSetCount() > 0) {
+      return (long) resultSetGroup.getResultSet(0).getDouble(0);
+    }
+    return 0L;
+  }
+
+  private long getScore() {
+    return (long) getPinotConnection().execute(
+            "SELECT score FROM " + UPSERT_TABLE_NAME + " WHERE playerId = 101")
+        .getResultSet(0).getFloat(0);
+  }
+
+  @Test
+  public void testRealtimeToOfflineSegmentsUpsertTask()
+      throws Exception {
+    // wait for documents to be loaded: 1000 (2019-2023) + 6 (2024)
+    TestUtils.waitForCondition(aVoid -> 
getCurrentCountStarResultWithoutUpsert(UPSERT_TABLE_NAME) == 1006,
+        600_000L,
+        "Failed to load all documents for upsert");
+    assertEquals(getCurrentCountStarResult(UPSERT_TABLE_NAME), 5);
+    assertEquals(getScore(), 3692);
+
+    // wait for segments to converge first
+    waitForNumQueriedSegmentsToConverge(UPSERT_TABLE_NAME, 10_000L, 3, 2);
+
+    // Pause consumption to force segment completion, then resume to trigger 
snapshot
+    
sendPostRequest(_controllerRequestURLBuilder.forPauseConsumption(UPSERT_TABLE_NAME));
+    waitForNumQueriedSegmentsToConverge(UPSERT_TABLE_NAME, 600_000L, 3, 0);
+    
sendPostRequest(_controllerRequestURLBuilder.forResumeConsumption(UPSERT_TABLE_NAME));
+    waitForNumQueriedSegmentsToConverge(UPSERT_TABLE_NAME, 600_000L, 5, 2);
+
+    // Run a single task iteration for upsert table
+    List<SegmentZKMetadata> segmentsZKMetadata = 
_helixResourceManager.getSegmentsZKMetadata(_offlineUpsertTableName);
+    assertTrue(segmentsZKMetadata.isEmpty());
+
+    // Query minimum timestamp directly from the data
+    long minDataTimeMs = getMinUpsertTimestamp();
+
+    // Round down to bucket boundary (365 days) to match task generator logic
+    long yearMs = 365 * 24 * 3600 * 1000L; // 365 days

Review Comment:
   The hardcoded calculation `365 * 24 * 3600 * 1000L` should be extracted as a 
constant since it duplicates the bucket period logic from the task 
configuration (line 256: '365d'). This would ensure consistency between the 
test's watermark calculation and the actual task behavior, and make the 
relationship explicit.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -221,6 +290,24 @@ public void postProcess(PinotTaskConfig pinotTaskConfig) {
         _expectedVersion);
   }
 
+  /**
+   * Validates that the CRC values from segment metadata match the expected 
CRC values.
+   * Copied from UpsertCompactMergeTaskExecutor for consistency.
+   */
+  void validateCRCForInputSegments(List<SegmentMetadataImpl> 
segmentMetadataList,
+      List<String> expectedCRCList) {
+    for (int i = 0; i < segmentMetadataList.size(); i++) {
+      SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i);
+      if (!Objects.equals(segmentMetadata.getCrc(), expectedCRCList.get(i))) {
+        String message = String.format("Crc mismatched between ZK and 
deepstore copy of segment: %s. Expected crc "
+                + "from ZK: %s, crc from deepstore: %s", 
segmentMetadata.getName(), expectedCRCList.get(i),
+            segmentMetadata.getCrc());

Review Comment:
   The CRC comparison is incorrect. `segmentMetadata.getCrc()` returns a 
`String`, but `expectedCRCList.get(i)` is already a `String`. However, the 
method compares them directly. The issue is that in line 203, `expectedCRCList` 
is created from a comma-separated string which might contain leading/trailing 
whitespace. Consider trimming the expected CRC values when splitting, or use 
`Long.parseLong()` for numeric comparison to match the approach in 
RealtimeToOfflineSegmentsTaskGenerator.java line 443 where CRC is compared as 
`long`.
   ```suggestion
         long actualCrc = Long.parseLong(segmentMetadata.getCrc());
         long expectedCrc = Long.parseLong(expectedCRCList.get(i).trim());
         if (actualCrc != expectedCrc) {
           String message = String.format("Crc mismatched between ZK and 
deepstore copy of segment: %s. Expected crc "
                   + "from ZK: %d, crc from deepstore: %d", 
segmentMetadata.getName(), expectedCrc, actualCrc);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to