Jackie-Jiang commented on code in PR #14250:
URL: https://github.com/apache/pinot/pull/14250#discussion_r2146206572


##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java:
##########
@@ -127,50 +130,111 @@ private TableReloadJsonResponse 
processSegmentMetadataReloadResponse(
 
   /**
    * This api takes in list of segments for which we need the metadata.
+   * This calls the server to get the metadata for all segments instead of 
making a call per segment.
    */
   public JsonNode getSegmentsMetadata(String tableNameWithType, List<String> 
columns, Set<String> segmentsToInclude,
       int timeoutMs)
       throws InvalidConfigException, IOException {
     return getSegmentsMetadataInternal(tableNameWithType, columns, 
segmentsToInclude, timeoutMs);
   }
 
-  private JsonNode getSegmentsMetadataInternal(String tableNameWithType, 
List<String> columns,
-      Set<String> segmentsToInclude, int timeoutMs)
+  /**
+   *   Common helper used by both the new (table-level) and legacy
+   *   (segment-level) endpoints.
+   */
+  private JsonNode fetchAndAggregateMetadata(List<String> urls,
+      BiMap<String, String> endpoints,
+      boolean perSegmentJson,
+      String tableNameWithType,
+      int timeoutMs)
       throws InvalidConfigException, IOException {
-    final Map<String, List<String>> serverToSegmentsMap =
-        _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
-    BiMap<String, String> endpoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
-    ServerSegmentMetadataReader serverSegmentMetadataReader =
-        new ServerSegmentMetadataReader(_executor, _connectionManager);
 
-    // Filter segments that we need
-    for (Map.Entry<String, List<String>> serverToSegment : 
serverToSegmentsMap.entrySet()) {
-      List<String> segments = serverToSegment.getValue();
-      if (segmentsToInclude != null && !segmentsToInclude.isEmpty()) {
-        segments.retainAll(segmentsToInclude);
+    CompletionServiceHelper cs =
+        new CompletionServiceHelper(_executor, _connectionManager, endpoints);
+    CompletionServiceHelper.CompletionServiceResponse resp =
+        cs.doMultiGetRequest(urls, tableNameWithType, perSegmentJson, 
timeoutMs);
+
+    if (resp._failedResponseCount == urls.size()) {
+      throw new InvalidConfigException("All requests to server instances 
failed.");
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    ObjectNode aggregatedNode = mapper.createObjectNode();
+
+    for (String body : resp._httpResponses.values()) {
+      JsonNode node = JsonUtils.stringToJsonNode(body);
+
+      // legacy returns one JSON per segment; new returns one JSON with many 
fields
+      if (perSegmentJson) {
+        String segmentName = node.get("segmentName").asText();
+        aggregatedNode.set(segmentName, node);
+      } else {
+        node.fields().forEachRemaining(entry -> 
aggregatedNode.set(entry.getKey(), entry.getValue()));
       }
     }
+    return aggregatedNode;
+  }
 
-    List<String> segmentsMetadata =
-        
serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType, 
serverToSegmentsMap, endpoints,
-            columns, timeoutMs);
-    Map<String, JsonNode> response = new HashMap<>();
-    for (String segmentMetadata : segmentsMetadata) {
-      JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
-      response.put(responseJson.get("segmentName").asText(), responseJson);
+  private List<String> buildTableLevelUrls(Map<String, List<String>> 
serverToSegs,
+      BiMap<String, String> endpoints,
+      String tableNameWithType,
+      List<String> columns,
+      Set<String> segmentsFilter,
+      ServerSegmentMetadataReader reader) {
+
+    List<String> urls = new ArrayList<>(serverToSegs.size());
+    for (String server : serverToSegs.keySet()) {
+      urls.add(reader.generateTableMetadataServerURL(
+          tableNameWithType, columns, segmentsFilter, endpoints.get(server)));
     }
-    return JsonUtils.objectToJsonNode(response);
+    return urls;
   }
 
-  /**
-   * This method retrieves the full segment metadata for a given table.
-   * Currently supports only OFFLINE tables.
-   * @return a map of segmentName to its metadata
-   */
-  public JsonNode getSegmentsMetadata(String tableNameWithType, List<String> 
columns, int timeoutMs)
+  private List<String> buildSegmentLevelUrls(Map<String, List<String>> 
serverToSegs,
+      BiMap<String, String> endpoints,
+      String tableNameWithType,
+      List<String> columns,
+      Set<String> segmentsFilter,
+      ServerSegmentMetadataReader reader) {
+
+    List<String> urls = new ArrayList<>();
+    for (Map.Entry<String, List<String>> e : serverToSegs.entrySet()) {
+      for (String segment : e.getValue()) {
+        if (segmentsFilter == null || segmentsFilter.isEmpty()
+            || segmentsFilter.contains(segment)) {
+          urls.add(reader.generateSegmentMetadataServerURL(
+              tableNameWithType, segment, columns, endpoints.get(e.getKey())));
+        }
+      }
+    }
+    return urls;
+  }
+
+  private JsonNode getSegmentsMetadataInternal(String tableNameWithType, 
List<String> columns,
+      Set<String> segmentsToInclude, int timeoutMs)
       throws InvalidConfigException, IOException {
-    return getSegmentsMetadataInternal(tableNameWithType, columns, null, 
timeoutMs);
+    Map<String, List<String>> serverToSegs =
+        _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    BiMap<String, String> endpoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegs.keySet());
+    ServerSegmentMetadataReader reader =
+        new ServerSegmentMetadataReader(_executor, _connectionManager);
+
+    // try table level endpoint first
+    try {
+      List<String> tableUrls = buildTableLevelUrls(serverToSegs, endpoints,
+          tableNameWithType, columns, segmentsToInclude, reader);
+      return fetchAndAggregateMetadata(tableUrls, endpoints, 
/*perSegmentJson=*/false,
+          tableNameWithType, timeoutMs);
+    } catch (InvalidConfigException ignore) {

Review Comment:
   Will `fetchAndAggregateMetadata()` throw other exceptions? Should we catch 
`Exception` here instead?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -925,8 +925,10 @@ private void deleteSegmentsInternal(String 
tableNameWithType, List<String> segme
   public String getServerMetadata(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
+      @ApiParam(value = "Segments name", allowMultiple = true) 
@QueryParam("segments")

Review Comment:
   Do you need to assign a default value to it? If it can be `null`, then let's 
add `@Nullable` annotation to it. Also, does the framework support directly 
constructing a `Set`? Do we have a test with multiple segments?
   
   ```suggestion
         @ApiParam(value = "Segments to include", allowMultiple = true) 
@QueryParam("segments")
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java:
##########
@@ -506,6 +513,19 @@ private String generateStaleSegmentsServerURL(String 
tableNameWithType, String e
     return String.format("%s/tables/%s/segments/isStale", endpoint, 
tableNameWithType);
   }
 
+  private String generateSegmentsParam(Set<String> values) {
+    String paramsStr = "";
+    if (values == null || values.isEmpty()) {
+      return paramsStr;
+    }
+    List<String> params = new ArrayList<>(values.size());
+    for (String value : values) {
+      params.add(String.format("segmentsToInclude=%s", value));

Review Comment:
   Is this correct? On the server side, seems the parameter is `segments`. Do 
you have a test for this?
   I prefer `segments` over `segmentsToInclude`



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -402,6 +404,62 @@ public String getSegmentMetadata(
     }
   }
 
+  @GET
+  @Encoded
+  @Path("/tables/{tableName}/segments/metadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Provide segments metadata", notes = "Provide segments 
metadata for the segments on server")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal server error", response = 
ErrorInfo.class),
+      @ApiResponse(code = 404, message = "Table or segment not found", 
response = ErrorInfo.class)
+  })
+  public String getSegmentsMetadata(
+      @ApiParam(value = "Table name including type", required = true, example 
= "myTable_OFFLINE")
+      @PathParam("tableName") String tableName,
+      @ApiParam(value = "Segment names to include", allowMultiple = true) 
@QueryParam("segments")

Review Comment:
   (minor)
   ```suggestion
         @ApiParam(value = "Segments to include", allowMultiple = true) 
@QueryParam("segments")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to