Jackie-Jiang commented on code in PR #14250:
URL: https://github.com/apache/pinot/pull/14250#discussion_r2132938696


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -926,7 +926,8 @@ public String getServerMetadata(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
       @ApiParam(value = "Columns name", allowMultiple = true) 
@QueryParam("columns") @DefaultValue("")
-          List<String> columns, @Context HttpHeaders headers) {
+      List<String> columns, @ApiParam(value = "Segments name", allowMultiple = 
true) @QueryParam("segments")

Review Comment:
   (minor) Put it in the next line for readability



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -402,6 +403,62 @@ public String getSegmentMetadata(
     }
   }
 
+  @GET
+  @Encoded
+  @Path("/tables/{tableName}/segments/metadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Provide segments metadata", notes = "Provide segments 
metadata for the segments on server")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal server error", response = 
ErrorInfo.class),
+      @ApiResponse(code = 404, message = "Table or segment not found", 
response = ErrorInfo.class)
+  })
+  public String getSegmentsMetadata(
+      @ApiParam(value = "Table name including type", required = true, example 
= "myTable_OFFLINE")
+      @PathParam("tableName") String tableName,
+      @ApiParam(value = "Segment names to include", allowMultiple = true) 
@QueryParam("segmentsToInclude")

Review Comment:
   (minor)
   ```suggestion
         @ApiParam(value = "Segment names to include", allowMultiple = true) 
@QueryParam("segments")
   ```



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -402,6 +403,62 @@ public String getSegmentMetadata(
     }
   }
 
+  @GET
+  @Encoded
+  @Path("/tables/{tableName}/segments/metadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Provide segments metadata", notes = "Provide segments 
metadata for the segments on server")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal server error", response = 
ErrorInfo.class),
+      @ApiResponse(code = 404, message = "Table or segment not found", 
response = ErrorInfo.class)
+  })
+  public String getSegmentsMetadata(
+      @ApiParam(value = "Table name including type", required = true, example 
= "myTable_OFFLINE")
+      @PathParam("tableName") String tableName,
+      @ApiParam(value = "Segment names to include", allowMultiple = true) 
@QueryParam("segmentsToInclude")
+      @DefaultValue("") List<String> segmentsToInclude,
+      @ApiParam(value = "Column name", allowMultiple = true) 
@QueryParam("columns") @DefaultValue("")
+      List<String> columns, @Context HttpHeaders headers) {
+    tableName = DatabaseUtils.translateTableName(tableName, headers);
+    TableDataManager tableDataManager = 
ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
+    // decode columns and segments
+    List<String> decodedSegments = new ArrayList<>();
+    if (segmentsToInclude != null && !segmentsToInclude.isEmpty()) {

Review Comment:
   (minor) Use `CollectionUtils.isNotEmpty()`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -926,7 +926,8 @@ public String getServerMetadata(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
       @ApiParam(value = "Columns name", allowMultiple = true) 
@QueryParam("columns") @DefaultValue("")
-          List<String> columns, @Context HttpHeaders headers) {
+      List<String> columns, @ApiParam(value = "Segments name", allowMultiple = 
true) @QueryParam("segments")

Review Comment:
   Do we need to decode the segment names?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -118,7 +118,7 @@
  *       <li>"/segments/{tableName}/servers": get a map from server to 
segments hosted by the server</li>
  *       <li>"/segments/{tableName}/crc": get a map from segment to CRC of the 
segment (OFFLINE table only)</li>
  *       <li>"/segments/{tableName}/{segmentName}/metadata: get the metadata 
for a segment</li>
- *       <li>"/segments/{tableName}/metadata: get the metadata for all 
segments from the server</li>
+ *       <li>"/segments/{tableName}/metadata</li>

Review Comment:
   Why removing the description?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1162,7 +1163,25 @@ private JsonNode getSegmentsMetadataFromServer(String 
tableNameWithType, List<St
     TableMetadataReader tableMetadataReader =
         new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
     return tableMetadataReader
-        .getSegmentsMetadata(tableNameWithType, columns, 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+        .getSegmentsMetadata(tableNameWithType, columns,
+            _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);

Review Comment:
   (minor) Revert this change



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1162,7 +1163,25 @@ private JsonNode getSegmentsMetadataFromServer(String 
tableNameWithType, List<St
     TableMetadataReader tableMetadataReader =
         new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
     return tableMetadataReader
-        .getSegmentsMetadata(tableNameWithType, columns, 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+        .getSegmentsMetadata(tableNameWithType, columns,
+            _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);

Review Comment:
   Is this method still in use?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java:
##########
@@ -127,40 +128,108 @@ private TableReloadJsonResponse 
processSegmentMetadataReloadResponse(
 
   /**
    * This api takes in list of segments for which we need the metadata.
+   * This calls the server to get the metadata for all segments instead of 
making a call per segment.
    */
   public JsonNode getSegmentsMetadata(String tableNameWithType, List<String> 
columns, Set<String> segmentsToInclude,
       int timeoutMs)
       throws InvalidConfigException, IOException {
     return getSegmentsMetadataInternal(tableNameWithType, columns, 
segmentsToInclude, timeoutMs);
   }
 
+  /**
+   *   Common helper used by both the new (table-level) and legacy
+   *   (segment-level) endpoints.
+   */
+  private JsonNode fetchAndAggregateMetadata(List<String> urls,
+      BiMap<String, String> endpoints,
+      boolean perSegmentJson,
+      String tableNameWithType,
+      int timeoutMs)
+      throws InvalidConfigException, IOException {
+
+    CompletionServiceHelper cs =
+        new CompletionServiceHelper(_executor, _connectionManager, endpoints);
+    CompletionServiceHelper.CompletionServiceResponse resp =
+        cs.doMultiGetRequest(urls, tableNameWithType, perSegmentJson, 
timeoutMs);
+
+    if (resp._failedResponseCount == urls.size()) {
+      throw new InvalidConfigException("All requests to server instances 
failed.");
+    }
+
+    Map<String, JsonNode> aggregated = new HashMap<>();
+    for (String body : resp._httpResponses.values()) {
+      JsonNode node = JsonUtils.stringToJsonNode(body);
+
+      // legacy returns one JSON per segment; new returns one JSON with many 
fields
+      if (perSegmentJson) {
+        aggregated.put(node.get("segmentName").asText(), node);
+      } else {
+        node.fields().forEachRemaining(e -> aggregated.put(e.getKey(), 
e.getValue()));
+      }
+    }
+    return JsonUtils.objectToJsonNode(aggregated);
+  }
+
+  private List<String> buildTableLevelUrls(Map<String, List<String>> 
serverToSegs,
+      BiMap<String, String> endpoints,
+      String tableNameWithType,
+      List<String> columns,
+      Set<String> segmentsFilter,
+      ServerSegmentMetadataReader reader) {
+
+    List<String> urls = new ArrayList<>(serverToSegs.size());
+    for (String server : serverToSegs.keySet()) {
+      urls.add(reader.generateTableMetadataServerURL(
+          tableNameWithType, columns, segmentsFilter, endpoints.get(server)));
+    }
+    return urls;
+  }
+
+  private List<String> buildSegmentLevelUrls(Map<String, List<String>> 
serverToSegs,
+      BiMap<String, String> endpoints,
+      String tableNameWithType,
+      List<String> columns,
+      Set<String> segmentsFilter,
+      ServerSegmentMetadataReader reader) {
+
+    List<String> urls = new ArrayList<>();
+    for (Map.Entry<String, List<String>> e : serverToSegs.entrySet()) {
+      for (String segment : e.getValue()) {
+        if (segmentsFilter == null || segmentsFilter.isEmpty()
+            || segmentsFilter.contains(segment)) {
+          urls.add(reader.generateSegmentMetadataServerURL(
+              tableNameWithType, segment, columns, endpoints.get(e.getKey())));
+        }
+      }
+    }
+    return urls;
+  }
+
   private JsonNode getSegmentsMetadataInternal(String tableNameWithType, 
List<String> columns,
       Set<String> segmentsToInclude, int timeoutMs)
       throws InvalidConfigException, IOException {
-    final Map<String, List<String>> serverToSegmentsMap =
+    Map<String, List<String>> serverToSegs =
         _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
     BiMap<String, String> endpoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
-    ServerSegmentMetadataReader serverSegmentMetadataReader =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegs.keySet());
+    ServerSegmentMetadataReader reader =
         new ServerSegmentMetadataReader(_executor, _connectionManager);
 
-    // Filter segments that we need
-    for (Map.Entry<String, List<String>> serverToSegment : 
serverToSegmentsMap.entrySet()) {
-      List<String> segments = serverToSegment.getValue();
-      if (segmentsToInclude != null && !segmentsToInclude.isEmpty()) {
-        segments.retainAll(segmentsToInclude);
-      }
+    // try table level endpoint first
+    try {
+      List<String> tableUrls = buildTableLevelUrls(serverToSegs, endpoints,
+          tableNameWithType, columns, segmentsToInclude, reader);
+      return fetchAndAggregateMetadata(tableUrls, endpoints, 
/*perSegmentJson=*/false,
+          tableNameWithType, timeoutMs);
+    } catch (InvalidConfigException ignore) {
+      // fall through to legacy
     }
 
-    List<String> segmentsMetadata =
-        
serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType, 
serverToSegmentsMap, endpoints,
-            columns, timeoutMs);
-    Map<String, JsonNode> response = new HashMap<>();
-    for (String segmentMetadata : segmentsMetadata) {
-      JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
-      response.put(responseJson.get("segmentName").asText(), responseJson);
-    }
-    return JsonUtils.objectToJsonNode(response);
+    // legacy per segment endpoint
+    List<String> segmentUrls = buildSegmentLevelUrls(serverToSegs, endpoints,
+        tableNameWithType, columns, segmentsToInclude, reader);
+    return fetchAndAggregateMetadata(segmentUrls, endpoints.inverse(), 
/*perSegmentJson=*/true,
+        tableNameWithType, timeoutMs);
   }
 
   /**

Review Comment:
   We can remove this method



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java:
##########
@@ -127,40 +128,108 @@ private TableReloadJsonResponse 
processSegmentMetadataReloadResponse(
 
   /**
    * This api takes in list of segments for which we need the metadata.
+   * This calls the server to get the metadata for all segments instead of 
making a call per segment.
    */
   public JsonNode getSegmentsMetadata(String tableNameWithType, List<String> 
columns, Set<String> segmentsToInclude,
       int timeoutMs)
       throws InvalidConfigException, IOException {
     return getSegmentsMetadataInternal(tableNameWithType, columns, 
segmentsToInclude, timeoutMs);
   }
 
+  /**
+   *   Common helper used by both the new (table-level) and legacy
+   *   (segment-level) endpoints.
+   */
+  private JsonNode fetchAndAggregateMetadata(List<String> urls,
+      BiMap<String, String> endpoints,
+      boolean perSegmentJson,
+      String tableNameWithType,
+      int timeoutMs)
+      throws InvalidConfigException, IOException {
+
+    CompletionServiceHelper cs =
+        new CompletionServiceHelper(_executor, _connectionManager, endpoints);
+    CompletionServiceHelper.CompletionServiceResponse resp =
+        cs.doMultiGetRequest(urls, tableNameWithType, perSegmentJson, 
timeoutMs);
+
+    if (resp._failedResponseCount == urls.size()) {
+      throw new InvalidConfigException("All requests to server instances 
failed.");
+    }
+
+    Map<String, JsonNode> aggregated = new HashMap<>();

Review Comment:
   If we are going to return a json object in the end, we can probably directly 
create an `ObjectNode` here



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -926,7 +926,8 @@ public String getServerMetadata(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
       @ApiParam(value = "Columns name", allowMultiple = true) 
@QueryParam("columns") @DefaultValue("")
-          List<String> columns, @Context HttpHeaders headers) {
+      List<String> columns, @ApiParam(value = "Segments name", allowMultiple = 
true) @QueryParam("segments")

Review Comment:
   (minor) To be consistent with `getSegmentMetadata`, let's put `segments` in 
front of `columns`. Same for other method signatures



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to