wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2670884310


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -4715,6 +4739,51 @@ public QueryWorkloadManager getQueryWorkloadManager() {
     return _queryWorkloadManager;
   }
 
+  /**
+   * Retrieves the consumer watermark for a given real-time table.
+   * <p>The watermark represents the next offset to be consumed for each 
partition group.
+   * If the latest segment of a partition is in a DONE state, the watermark is 
the end offset of the completed segment.
+   * Otherwise, it is the start offset of the current consuming segment.
+   *
+   * @param tableName The name of the real-time table (without type suffix).
+   * @return A {@link WatermarkInductionResult} containing a list of 
watermarks for each partition group.
+   * @throws TableNotFoundException if the specified real-time table does not 
exist.
+   * @throws IllegalStateException if the IdealState for the table is not 
found.
+   */
+  public WatermarkInductionResult getConsumerWatermarks(String tableName) 
throws TableNotFoundException {
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    if (!hasRealtimeTable(tableName)) {
+      throw new TableNotFoundException("Table " + tableNameWithType + " does 
not exist");
+    }
+    TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType + 
"exists but null tableConfig");
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+    IdealState idealState = _helixAdmin
+        .getResourceIdealState(getHelixClusterName(), tableNameWithType);
+    if (idealState == null) {
+      throw new IllegalStateException("Null IdealState of the table " + 
tableNameWithType);
+    }
+    List<PartitionGroupConsumptionStatus> lst = _pinotLLCRealtimeSegmentManager
+        .getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+    List<WatermarkInductionResult.Watermark> watermarks = 
lst.stream().map(status -> {
+      int seq = status.getSequenceNumber();
+      long startOffset;
+      try {
+        if ("DONE".equalsIgnoreCase(status.getStatus())) {
+          Preconditions.checkNotNull(status.getEndOffset());
+          startOffset = 
NumberUtils.parseLong(status.getEndOffset().toString());
+          seq++;
+        } else {

Review Comment:
   The testGetConsumerWatermarks already contains both cases.
   
   ```
       PartitionGroupConsumptionStatus doneStatus = new 
PartitionGroupConsumptionStatus(0, 100,
           new LongMsgOffset(123), new LongMsgOffset(456), "done");
       PartitionGroupConsumptionStatus inProgressStatus =
           new PartitionGroupConsumptionStatus(1, 200, new LongMsgOffset(789), 
null, "IN_PROGRESS");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to