Copilot commented on code in PR #16361:
URL: https://github.com/apache/pinot/pull/16361#discussion_r2442599972
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -165,31 +176,89 @@ protected List<SegmentConversionResult>
convert(PinotTaskConfig pinotTaskConfig,
SegmentProcessorConfig segmentProcessorConfig =
segmentProcessorConfigBuilder.build();
- List<RecordReader> recordReaders = new ArrayList<>(numInputSegments);
+ // Check if this is an upsert table from the pre-computed config value
+ boolean isUpsertTable =
+
Boolean.parseBoolean(configs.getOrDefault(RealtimeToOfflineSegmentsTask.IS_UPSERT_TABLE,
"false"));
+
+ // Create record readers - use CompactedPinotSegmentRecordReader for
upsert tables
+ List<RecordReaderFileConfig> recordReaderFileConfigs = new
ArrayList<>(numInputSegments);
int count = 1;
- for (File segmentDir : segmentDirs) {
- _eventObserver.notifyProgress(_pinotTaskConfig,
- String.format("Creating RecordReader for: %s (%d out of %d)",
segmentDir, count++, numInputSegments));
- PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
- // NOTE: Do not fill null field with default value to be consistent with
other record readers
- recordReader.init(segmentDir, null, null, true);
- recordReaders.add(recordReader);
- }
- List<File> outputSegmentDirs;
- try {
- _eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments");
- SegmentProcessorFramework framework =
- new SegmentProcessorFramework(recordReaders, segmentProcessorConfig,
workingDir);
- outputSegmentDirs = framework.process();
- _eventObserver.notifyProgress(pinotTaskConfig,
- "Segment processing stats - incomplete rows:" +
framework.getIncompleteRowsFound() + ", dropped rows:"
- + framework.getSkippedRowsFound() + ", sanitized rows:" +
framework.getSanitizedRowsFound());
- } finally {
- for (RecordReader recordReader : recordReaders) {
- recordReader.close();
+
+ if (isUpsertTable) {
+ LOGGER.info("Processing upsert table: {}, using
CompactedPinotSegmentRecordReader for {} segments",
+ realtimeTableName, numInputSegments);
+ _eventObserver.notifyProgress(pinotTaskConfig, "Creating upsert-aware
record readers");
+
+ // Get segment metadata for CRC validation
+ List<SegmentMetadataImpl> segmentMetadataList =
segmentDirs.stream().map(x -> {
+ try {
+ return new SegmentMetadataImpl(x);
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Error fetching
segment-metadata for segmentDir: %s", x), e);
+ }
+ }).collect(Collectors.toList());
+
+ // Validate if CRC of deepstore copies is same as that in ZK of segments
+ List<String> originalSegmentCrcFromTaskGenerator =
+
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
+ validateCRCForInputSegments(segmentMetadataList,
originalSegmentCrcFromTaskGenerator);
+
+ for (int i = 0; i < segmentDirs.size(); i++) {
+ File segmentDir = segmentDirs.get(i);
+ SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i);
+ _eventObserver.notifyProgress(_pinotTaskConfig,
+ String.format("Creating CompactedRecordReader for: %s (%d out of
%d)", segmentDir, count++,
+ numInputSegments));
+
+ // Fetch validDocID snapshot from server and get record-reader for
compacted reader (hardcode SNAPSHOT like
+ // UpsertCompactMergeTask)
+ RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(
+ realtimeTableName, segmentMetadata.getName(),
ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT,
+ segmentMetadata.getCrc());
+
+ if (validDocIds == null) {
+ // No valid crc match found or no validDocIds obtained from all
servers
+ // Error out the task instead of silently failing so that we can
track it via task-error metrics
+ String message = String.format("No validDocIds found from all
servers. They either failed to download "
+ + "or did not match crc from segment copy obtained from
deepstore / servers. " + "Expected crc: %s",
Review Comment:
The error message formatting has an issue. The string concatenation on line
222-223 creates a malformed message with 'Expected crc: %s' but the format
specifier is not used with String.format(). Either use String.format() properly
or use string concatenation consistently throughout the message.
```suggestion
String message = String.format(
"No validDocIds found from all servers. They either failed to
download or did not match crc from segment copy obtained from deepstore /
servers. Expected crc: %s",
```
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java:
##########
@@ -321,6 +410,99 @@ public void testRealtimeToOfflineSegmentsMetadataPushTask()
}
}
+ private long getMinUpsertTimestamp() {
+ ResultSetGroup resultSetGroup =
+ getPinotConnection().execute("SELECT MIN(" + UPSERT_TIME_COL_NAME + ")
FROM " + UPSERT_TABLE_NAME
+ + " OPTION(skipUpsert=true)");
+ if (resultSetGroup.getResultSetCount() > 0) {
+ return (long) resultSetGroup.getResultSet(0).getDouble(0);
+ }
+ return 0L;
+ }
+
+ private long getScore() {
+ return (long) getPinotConnection().execute(
+ "SELECT score FROM " + UPSERT_TABLE_NAME + " WHERE playerId = 101")
+ .getResultSet(0).getFloat(0);
+ }
+
+ @Test
+ public void testRealtimeToOfflineSegmentsUpsertTask()
+ throws Exception {
+ // wait for documents to be loaded: 1000 (2019-2023) + 6 (2024)
+ TestUtils.waitForCondition(aVoid ->
getCurrentCountStarResultWithoutUpsert(UPSERT_TABLE_NAME) == 1006,
+ 600_000L,
+ "Failed to load all documents for upsert");
+ assertEquals(getCurrentCountStarResult(UPSERT_TABLE_NAME), 5);
+ assertEquals(getScore(), 3692);
+
+ // wait for segments to converge first
+ waitForNumQueriedSegmentsToConverge(UPSERT_TABLE_NAME, 10_000L, 3, 2);
+
+ // Pause consumption to force segment completion, then resume to trigger
snapshot
+
sendPostRequest(_controllerRequestURLBuilder.forPauseConsumption(UPSERT_TABLE_NAME));
+ waitForNumQueriedSegmentsToConverge(UPSERT_TABLE_NAME, 600_000L, 3, 0);
+
sendPostRequest(_controllerRequestURLBuilder.forResumeConsumption(UPSERT_TABLE_NAME));
+ waitForNumQueriedSegmentsToConverge(UPSERT_TABLE_NAME, 600_000L, 5, 2);
+
+ // Run a single task iteration for upsert table
+ List<SegmentZKMetadata> segmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(_offlineUpsertTableName);
+ assertTrue(segmentsZKMetadata.isEmpty());
+
+ // Query minimum timestamp directly from the data
+ long minDataTimeMs = getMinUpsertTimestamp();
+
+ // Round down to bucket boundary (365 days) to match task generator logic
+ long yearMs = 365 * 24 * 3600 * 1000L; // 365 days
Review Comment:
The hardcoded calculation `365 * 24 * 3600 * 1000L` should be extracted as a
constant since it duplicates the bucket period logic from the task
configuration (line 256: '365d'). This would ensure consistency between the
test's watermark calculation and the actual task behavior, and make the
relationship explicit.
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -221,6 +290,24 @@ public void postProcess(PinotTaskConfig pinotTaskConfig) {
_expectedVersion);
}
+ /**
+ * Validates that the CRC values from segment metadata match the expected
CRC values.
+ * Copied from UpsertCompactMergeTaskExecutor for consistency.
+ */
+ void validateCRCForInputSegments(List<SegmentMetadataImpl>
segmentMetadataList,
+ List<String> expectedCRCList) {
+ for (int i = 0; i < segmentMetadataList.size(); i++) {
+ SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i);
+ if (!Objects.equals(segmentMetadata.getCrc(), expectedCRCList.get(i))) {
+ String message = String.format("Crc mismatched between ZK and
deepstore copy of segment: %s. Expected crc "
+ + "from ZK: %s, crc from deepstore: %s",
segmentMetadata.getName(), expectedCRCList.get(i),
+ segmentMetadata.getCrc());
Review Comment:
The CRC comparison is incorrect. `segmentMetadata.getCrc()` returns a
`String`, but `expectedCRCList.get(i)` is already a `String`. However, the
method compares them directly. The issue is that in line 203, `expectedCRCList`
is created from a comma-separated string which might contain leading/trailing
whitespace. Consider trimming the expected CRC values when splitting, or use
`Long.parseLong()` for numeric comparison to match the approach in
RealtimeToOfflineSegmentsTaskGenerator.java line 443 where CRC is compared as
`long`.
```suggestion
long actualCrc = Long.parseLong(segmentMetadata.getCrc());
long expectedCrc = Long.parseLong(expectedCRCList.get(i).trim());
if (actualCrc != expectedCrc) {
String message = String.format("Crc mismatched between ZK and
deepstore copy of segment: %s. Expected crc "
+ "from ZK: %d, crc from deepstore: %d",
segmentMetadata.getName(), expectedCrc, actualCrc);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]