Copilot commented on code in PR #16361:
URL: https://github.com/apache/pinot/pull/16361#discussion_r2438728991
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -354,4 +399,80 @@ public void validateTaskConfigs(TableConfig tableConfig,
Schema schema, Map<Stri
}
}
}
+
+ private Map<String, List<ValidDocIdsMetadataInfo>> fetchValidDocIds(String
realtimeTableName,
+ Map<String, String> taskConfigs) {
+ // Get server to segment mappings
+ PinotHelixResourceManager pinotHelixResourceManager =
_clusterInfoAccessor.getPinotHelixResourceManager();
+ Map<String, List<String>> serverToSegments =
+
pinotHelixResourceManager.getServerToOnlineSegmentsMapFromEV(realtimeTableName,
true);
+ BiMap<String, String> serverToEndpoints;
+ try {
+ serverToEndpoints =
pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ } catch (InvalidConfigException e) {
+ throw new RuntimeException(e);
+ }
+
+ ServerSegmentMetadataReader serverSegmentMetadataReader =
+ new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(),
+ _clusterInfoAccessor.getConnectionManager());
+
+ // Number of segments to query per server request
+ int numSegmentsBatchPerServerRequest = Integer.parseInt(
+
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
+ String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));
+
+ return
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(realtimeTableName,
serverToSegments,
+ serverToEndpoints, null, 60_000, ValidDocIdsType.SNAPSHOT.toString(),
numSegmentsBatchPerServerRequest);
Review Comment:
Magic number 60_000 (timeout in milliseconds) should be extracted as a named
constant for clarity and maintainability.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java:
##########
@@ -321,6 +410,99 @@ public void testRealtimeToOfflineSegmentsMetadataPushTask()
}
}
+ private long getMinUpsertTimestamp() {
+ ResultSetGroup resultSetGroup =
+ getPinotConnection().execute("SELECT MIN(" + UPSERT_TIME_COL_NAME + ")
FROM " + UPSERT_TABLE_NAME
+ + " OPTION(skipUpsert=true)");
+ if (resultSetGroup.getResultSetCount() > 0) {
+ return (long) resultSetGroup.getResultSet(0).getDouble(0);
+ }
+ return 0L;
+ }
+
+ private long getScore() {
+ return (long) getPinotConnection().execute(
+ "SELECT score FROM " + UPSERT_TABLE_NAME + " WHERE playerId = 101")
+ .getResultSet(0).getFloat(0);
+ }
+
+ @Test
+ public void testRealtimeToOfflineSegmentsUpsertTask()
+ throws Exception {
+ // wait for documents to be loaded: 1000 (2019-2023) + 6 (2024)
+ TestUtils.waitForCondition(aVoid ->
getCurrentCountStarResultWithoutUpsert(UPSERT_TABLE_NAME) == 1006,
+ 600_000L,
+ "Failed to load all documents for upsert");
+ assertEquals(getCurrentCountStarResult(UPSERT_TABLE_NAME), 5);
+ assertEquals(getScore(), 3692);
+
+ // wait for segments to converge first
+ waitForNumQueriedSegmentsToConverge(UPSERT_TABLE_NAME, 10_000L, 3, 2);
+
+ // Pause consumption to force segment completion, then resume to trigger
snapshot
+
sendPostRequest(_controllerRequestURLBuilder.forPauseConsumption(UPSERT_TABLE_NAME));
+ waitForNumQueriedSegmentsToConverge(UPSERT_TABLE_NAME, 600_000L, 3, 0);
+
sendPostRequest(_controllerRequestURLBuilder.forResumeConsumption(UPSERT_TABLE_NAME));
+ waitForNumQueriedSegmentsToConverge(UPSERT_TABLE_NAME, 600_000L, 5, 2);
+
+ // Run a single task iteration for upsert table
+ List<SegmentZKMetadata> segmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(_offlineUpsertTableName);
+ assertTrue(segmentsZKMetadata.isEmpty());
+
+ // Query minimum timestamp directly from the data
+ long minDataTimeMs = getMinUpsertTimestamp();
+
+ // Round down to bucket boundary (365 days) to match task generator logic
+ long yearMs = 365 * 24 * 3600 * 1000L; // 365 days
Review Comment:
Duplicate calculation: yearMs is computed here but is also used in the task
config as '365d'. Consider extracting this as a shared constant to ensure
consistency between test and implementation.
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -221,6 +290,24 @@ public void postProcess(PinotTaskConfig pinotTaskConfig) {
_expectedVersion);
}
+ /**
+ * Validates that the CRC values from segment metadata match the expected
CRC values.
+ * Copied from UpsertCompactMergeTaskExecutor for consistency.
+ */
+ void validateCRCForInputSegments(List<SegmentMetadataImpl>
segmentMetadataList,
+ List<String> expectedCRCList) {
+ for (int i = 0; i < segmentMetadataList.size(); i++) {
+ SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i);
+ if (!Objects.equals(segmentMetadata.getCrc(), expectedCRCList.get(i))) {
Review Comment:
Type mismatch in comparison: segmentMetadata.getCrc() returns a String but
expectedCRCList contains Strings. The comparison should convert getCrc() result
to String explicitly for type safety.
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -37,14 +40,18 @@
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import
org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.plugin.minion.tasks.mergerollup.MergeRollupTaskUtils;
+import
org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
Review Comment:
Unused import: RecordReaderFileConfig is imported but the removed
RecordReader import suggests this may not be used consistently throughout the
changes. Verify that all RecordReaderFileConfig usages are necessary.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]