Copilot commented on code in PR #17040:
URL: https://github.com/apache/pinot/pull/17040#discussion_r2442783130


##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java:
##########
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import javax.net.ssl.SSLContext;
+import org.apache.pinot.client.utils.ConnectionUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.ClientStats;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
+import org.asynchttpclient.Dsl;
+import org.asynchttpclient.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * HTTP transport for Pinot admin operations.
+ * Handles communication with Pinot controller REST APIs.
+ */
+public class PinotAdminTransport implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminTransport.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Gets the ObjectMapper instance for JSON serialization/deserialization.
+   *
+   * @return ObjectMapper instance
+   */
+  public static ObjectMapper getObjectMapper() {
+    return OBJECT_MAPPER;
+  }
+
+  private final AsyncHttpClient _httpClient;
+  private final String _scheme;
+  private final Map<String, String> _defaultHeaders;
+  private final int _requestTimeoutMs;
+
+  public PinotAdminTransport(Properties properties, Map<String, String> 
authHeaders) {
+    _defaultHeaders = authHeaders != null ? authHeaders : Map.of();
+
+    // Extract timeout configuration
+    _requestTimeoutMs = 
Integer.parseInt(properties.getProperty("pinot.admin.request.timeout.ms", 
"60000"));
+
+    // Extract scheme (http/https)
+    String scheme = properties.getProperty("pinot.admin.scheme", 
CommonConstants.HTTP_PROTOCOL);
+    _scheme = scheme;
+
+    // Build HTTP client
+    Builder builder = Dsl.config()
+        .setRequestTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setReadTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setConnectTimeout(Duration.ofMillis(10000)) // 10 second connect 
timeout
+        .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", 
null));
+
+    // Configure SSL if needed
+    if (CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(scheme)) {
+      try {
+        SSLContext sslContext = SSLContext.getDefault();
+        builder.setSslContext(new 
io.netty.handler.ssl.JdkSslContext(sslContext, true,
+            io.netty.handler.ssl.ClientAuth.OPTIONAL));
+      } catch (Exception e) {
+        LOGGER.warn("Failed to configure SSL context, proceeding without SSL", 
e);
+      }
+    }
+
+    _httpClient = Dsl.asyncHttpClient(builder.build());
+    LOGGER.info("Initialized Pinot admin transport with scheme: {}, timeout: 
{}ms", scheme, _requestTimeoutMs);
+  }
+
+  /**
+   * Executes a GET request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executeGet(String controllerAddress, String path, 
Map<String, String> queryParams,
+      Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executeGetAsync(controllerAddress, path, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute GET request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async GET request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executeGetAsync(String controllerAddress, 
String path,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "GET", null, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute GET request to " + path, throwable);
+          throw new RuntimeException("Failed to execute GET request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a POST request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executePost(String controllerAddress, String path, Object 
body,
+      Map<String, String> queryParams, Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executePostAsync(controllerAddress, path, body, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute POST request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async POST request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executePostAsync(String 
controllerAddress, String path, Object body,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "POST", body, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute POST request to " + path, throwable);
+          throw new RuntimeException("Failed to execute POST request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a PUT request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executePut(String controllerAddress, String path, Object 
body,
+      Map<String, String> queryParams, Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executePutAsync(controllerAddress, path, body, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute PUT request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async PUT request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executePutAsync(String controllerAddress, 
String path, Object body,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "PUT", body, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute PUT request to " + path, throwable);
+          throw new RuntimeException("Failed to execute PUT request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a DELETE request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executeDelete(String controllerAddress, String path, 
Map<String, String> queryParams,
+      Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executeDeleteAsync(controllerAddress, path, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute DELETE request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async DELETE request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executeDeleteAsync(String 
controllerAddress, String path,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "DELETE", null, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute DELETE request to " + path, 
throwable);
+          throw new RuntimeException("Failed to execute DELETE request", 
throwable);
+        });
+  }
+
+  private CompletableFuture<Response> executeRequestAsync(String 
controllerAddress, String path, String method,
+      Object body, Map<String, String> queryParams, Map<String, String> 
headers) {
+    String url = buildUrl(controllerAddress, path, queryParams);
+
+    BoundRequestBuilder requestBuilder = _httpClient.prepare(method, url);
+
+    // Add default headers
+    for (Map.Entry<String, String> header : _defaultHeaders.entrySet()) {
+      requestBuilder.addHeader(header.getKey(), header.getValue());
+    }
+
+    // Add request-specific headers
+    if (headers != null) {
+      for (Map.Entry<String, String> header : headers.entrySet()) {
+        requestBuilder.addHeader(header.getKey(), header.getValue());
+      }
+    }
+
+    // Set content type for requests with body
+    if (body != null) {
+      String bodyStr = body instanceof String ? (String) body : toJson(body);
+      requestBuilder.setBody(bodyStr).addHeader("Content-Type", 
"application/json");
+    }
+
+    return requestBuilder.execute().toCompletableFuture();
+  }
+
+  String buildUrl(String controllerAddress, String path, Map<String, String> 
queryParams) {
+    StringBuilder url = new 
StringBuilder(_scheme).append("://").append(controllerAddress).append(path);
+
+    if (queryParams != null && !queryParams.isEmpty()) {
+      url.append("?");
+      boolean first = true;
+      for (Map.Entry<String, String> param : queryParams.entrySet()) {
+        if (!first) {
+          url.append("&");
+        }
+        url.append(param.getKey()).append("=");

Review Comment:
   URL encoding is only applied to parameter values but not to parameter keys. 
Parameter keys should also be URL encoded to handle special characters properly.
   ```suggestion
           url.append(URLEncoder.encode(param.getKey(), 
StandardCharsets.UTF_8)).append("=");
   ```



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java:
##########
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import javax.net.ssl.SSLContext;
+import org.apache.pinot.client.utils.ConnectionUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.ClientStats;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
+import org.asynchttpclient.Dsl;
+import org.asynchttpclient.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * HTTP transport for Pinot admin operations.
+ * Handles communication with Pinot controller REST APIs.
+ */
+public class PinotAdminTransport implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminTransport.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Gets the ObjectMapper instance for JSON serialization/deserialization.
+   *
+   * @return ObjectMapper instance
+   */
+  public static ObjectMapper getObjectMapper() {
+    return OBJECT_MAPPER;
+  }
+
+  private final AsyncHttpClient _httpClient;
+  private final String _scheme;
+  private final Map<String, String> _defaultHeaders;
+  private final int _requestTimeoutMs;
+
+  public PinotAdminTransport(Properties properties, Map<String, String> 
authHeaders) {
+    _defaultHeaders = authHeaders != null ? authHeaders : Map.of();
+
+    // Extract timeout configuration
+    _requestTimeoutMs = 
Integer.parseInt(properties.getProperty("pinot.admin.request.timeout.ms", 
"60000"));
+
+    // Extract scheme (http/https)
+    String scheme = properties.getProperty("pinot.admin.scheme", 
CommonConstants.HTTP_PROTOCOL);
+    _scheme = scheme;
+
+    // Build HTTP client
+    Builder builder = Dsl.config()
+        .setRequestTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setReadTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setConnectTimeout(Duration.ofMillis(10000)) // 10 second connect 
timeout
+        .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", 
null));
+
+    // Configure SSL if needed
+    if (CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(scheme)) {
+      try {
+        SSLContext sslContext = SSLContext.getDefault();
+        builder.setSslContext(new 
io.netty.handler.ssl.JdkSslContext(sslContext, true,
+            io.netty.handler.ssl.ClientAuth.OPTIONAL));

Review Comment:
   SSL client authentication is set to OPTIONAL, which may allow connections 
without proper client certificates. Consider using ClientAuth.REQUIRE for 
stronger security or make this configurable based on deployment requirements.
   ```suggestion
           // Make client authentication mode configurable, default to REQUIRE
           String clientAuthMode = 
properties.getProperty("pinot.admin.ssl.client.auth", "REQUIRE").toUpperCase();
           io.netty.handler.ssl.ClientAuth clientAuth;
           switch (clientAuthMode) {
             case "NONE":
               clientAuth = io.netty.handler.ssl.ClientAuth.NONE;
               break;
             case "OPTIONAL":
               clientAuth = io.netty.handler.ssl.ClientAuth.OPTIONAL;
               break;
             case "REQUIRE":
             default:
               clientAuth = io.netty.handler.ssl.ClientAuth.REQUIRE;
               break;
           }
           builder.setSslContext(new 
io.netty.handler.ssl.JdkSslContext(sslContext, true, clientAuth));
   ```



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminAuthentication.java:
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for handling authentication in Pinot admin operations.
+ */
+public class PinotAdminAuthentication {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminAuthentication.class);
+
+  private PinotAdminAuthentication() {
+  }
+
+  public enum AuthType {
+    NONE,
+    BASIC,
+    BEARER,
+    CUSTOM
+  }
+
+  /**
+   * Creates authentication headers for basic authentication.
+   *
+   * @param username Username
+   * @param password Password
+   * @return Authentication headers
+   */
+  public static Map<String, String> createBasicAuthHeaders(String username, 
String password) {
+    String authString = username + ":" + password;
+    String encodedAuth = 
Base64.getEncoder().encodeToString(authString.getBytes());
+    Map<String, String> headers = new HashMap<>();
+    headers.put("Authorization", "Basic " + encodedAuth);
+    LOGGER.debug("Created basic authentication headers for user: {}", 
username);

Review Comment:
   Logging the username in authentication operations could expose sensitive 
information in logs. Consider removing this debug log or making it conditional 
based on a secure debug flag.
   ```suggestion
       // LOGGER.debug("Created basic authentication headers for user: {}", 
username);
   ```



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java:
##########
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import javax.net.ssl.SSLContext;
+import org.apache.pinot.client.utils.ConnectionUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.ClientStats;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
+import org.asynchttpclient.Dsl;
+import org.asynchttpclient.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * HTTP transport for Pinot admin operations.
+ * Handles communication with Pinot controller REST APIs.
+ */
+public class PinotAdminTransport implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminTransport.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Gets the ObjectMapper instance for JSON serialization/deserialization.
+   *
+   * @return ObjectMapper instance
+   */
+  public static ObjectMapper getObjectMapper() {
+    return OBJECT_MAPPER;
+  }
+
+  private final AsyncHttpClient _httpClient;
+  private final String _scheme;
+  private final Map<String, String> _defaultHeaders;
+  private final int _requestTimeoutMs;
+
+  public PinotAdminTransport(Properties properties, Map<String, String> 
authHeaders) {
+    _defaultHeaders = authHeaders != null ? authHeaders : Map.of();
+
+    // Extract timeout configuration
+    _requestTimeoutMs = 
Integer.parseInt(properties.getProperty("pinot.admin.request.timeout.ms", 
"60000"));
+
+    // Extract scheme (http/https)
+    String scheme = properties.getProperty("pinot.admin.scheme", 
CommonConstants.HTTP_PROTOCOL);
+    _scheme = scheme;
+
+    // Build HTTP client
+    Builder builder = Dsl.config()
+        .setRequestTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setReadTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setConnectTimeout(Duration.ofMillis(10000)) // 10 second connect 
timeout
+        .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", 
null));
+
+    // Configure SSL if needed
+    if (CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(scheme)) {
+      try {
+        SSLContext sslContext = SSLContext.getDefault();
+        builder.setSslContext(new 
io.netty.handler.ssl.JdkSslContext(sslContext, true,
+            io.netty.handler.ssl.ClientAuth.OPTIONAL));
+      } catch (Exception e) {
+        LOGGER.warn("Failed to configure SSL context, proceeding without SSL", 
e);
+      }
+    }
+
+    _httpClient = Dsl.asyncHttpClient(builder.build());
+    LOGGER.info("Initialized Pinot admin transport with scheme: {}, timeout: 
{}ms", scheme, _requestTimeoutMs);
+  }
+
+  /**
+   * Executes a GET request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executeGet(String controllerAddress, String path, 
Map<String, String> queryParams,
+      Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executeGetAsync(controllerAddress, path, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute GET request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async GET request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executeGetAsync(String controllerAddress, 
String path,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "GET", null, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute GET request to " + path, throwable);
+          throw new RuntimeException("Failed to execute GET request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a POST request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executePost(String controllerAddress, String path, Object 
body,
+      Map<String, String> queryParams, Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executePostAsync(controllerAddress, path, body, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute POST request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async POST request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executePostAsync(String 
controllerAddress, String path, Object body,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "POST", body, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute POST request to " + path, throwable);
+          throw new RuntimeException("Failed to execute POST request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a PUT request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executePut(String controllerAddress, String path, Object 
body,
+      Map<String, String> queryParams, Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executePutAsync(controllerAddress, path, body, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute PUT request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async PUT request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executePutAsync(String controllerAddress, 
String path, Object body,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "PUT", body, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute PUT request to " + path, throwable);
+          throw new RuntimeException("Failed to execute PUT request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a DELETE request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executeDelete(String controllerAddress, String path, 
Map<String, String> queryParams,
+      Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executeDeleteAsync(controllerAddress, path, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute DELETE request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async DELETE request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executeDeleteAsync(String 
controllerAddress, String path,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "DELETE", null, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute DELETE request to " + path, 
throwable);
+          throw new RuntimeException("Failed to execute DELETE request", 
throwable);
+        });
+  }
+
+  private CompletableFuture<Response> executeRequestAsync(String 
controllerAddress, String path, String method,
+      Object body, Map<String, String> queryParams, Map<String, String> 
headers) {
+    String url = buildUrl(controllerAddress, path, queryParams);
+
+    BoundRequestBuilder requestBuilder = _httpClient.prepare(method, url);
+
+    // Add default headers
+    for (Map.Entry<String, String> header : _defaultHeaders.entrySet()) {
+      requestBuilder.addHeader(header.getKey(), header.getValue());
+    }
+
+    // Add request-specific headers
+    if (headers != null) {
+      for (Map.Entry<String, String> header : headers.entrySet()) {
+        requestBuilder.addHeader(header.getKey(), header.getValue());
+      }
+    }
+
+    // Set content type for requests with body
+    if (body != null) {
+      String bodyStr = body instanceof String ? (String) body : toJson(body);
+      requestBuilder.setBody(bodyStr).addHeader("Content-Type", 
"application/json");
+    }
+
+    return requestBuilder.execute().toCompletableFuture();
+  }
+
+  String buildUrl(String controllerAddress, String path, Map<String, String> 
queryParams) {
+    StringBuilder url = new 
StringBuilder(_scheme).append("://").append(controllerAddress).append(path);
+
+    if (queryParams != null && !queryParams.isEmpty()) {
+      url.append("?");
+      boolean first = true;
+      for (Map.Entry<String, String> param : queryParams.entrySet()) {
+        if (!first) {
+          url.append("&");
+        }
+        url.append(param.getKey()).append("=");
+        if (param.getValue() != null) {
+          url.append(URLEncoder.encode(param.getValue(), 
StandardCharsets.UTF_8));
+        }
+        first = false;
+      }
+    }
+
+    return url.toString();
+  }
+
+  private String toJson(Object obj) {
+    try {
+      return OBJECT_MAPPER.writeValueAsString(obj);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to serialize object to JSON", e);
+    }
+  }
+
+  /**
+   * Parses a JSON array field into a List of Strings.
+   * Handles both actual JSON arrays and comma-separated strings for backward 
compatibility.
+   *
+   * @param response JSON response node
+   * @param fieldName Name of the field containing the array
+   * @return List of strings from the array field
+   * @throws PinotAdminException If the field is missing, null, or not in 
expected format
+   */
+  public List<String> parseStringArray(JsonNode response, String fieldName)
+      throws PinotAdminException {
+    JsonNode arrayNode = response.get(fieldName);
+    if (arrayNode == null) {
+      throw new PinotAdminException("Response missing '" + fieldName + "' 
field");
+    }
+
+    if (arrayNode.isArray()) {
+      // Handle JSON array format
+      java.util.List<String> result = new java.util.ArrayList<>();
+      for (JsonNode element : arrayNode) {
+        result.add(element.asText());
+      }
+      return result;
+    } else if (arrayNode.isTextual()) {
+      // Handle comma-separated string format for backward compatibility
+      String text = arrayNode.asText().trim();
+      if (text.isEmpty()) {
+        return java.util.Collections.emptyList();
+      }
+      return java.util.Arrays.asList(text.split(","));
+    } else {
+      throw new PinotAdminException("Field '" + fieldName + "' is not an array 
or string: " + arrayNode.getNodeType());
+    }
+  }
+
+  /**
+   * Safely parses a JSON array field into a List of Strings for async 
operations.
+   * Returns empty list on error instead of throwing exception.
+   *
+   * @param response JSON response node
+   * @param fieldName Name of the field containing the array
+   * @return List of strings from the array field, or empty list if parsing 
fails
+   */
+  public List<String> parseStringArraySafe(JsonNode response, String 
fieldName) {
+    try {
+      return parseStringArray(response, fieldName);
+    } catch (PinotAdminException e) {
+      LOGGER.warn("Failed to parse string array for field '{}': {}", 
fieldName, e.getMessage());
+      return java.util.Collections.emptyList();
+    }
+  }
+
+  private JsonNode parseResponse(Response response) {
+    try {
+      int statusCode = response.getStatusCode();
+      String responseBody = response.getResponseBody();
+
+      if (statusCode >= 200 && statusCode < 300) {
+        if (responseBody == null || responseBody.trim().isEmpty()) {
+          return OBJECT_MAPPER.createObjectNode();
+        }
+        return OBJECT_MAPPER.readTree(responseBody);
+      } else {
+        // Handle specific error cases
+        if (statusCode == 401) {
+          throw new PinotAdminAuthenticationException("Authentication failed: 
" + responseBody);
+        } else if (statusCode == 403) {
+          throw new PinotAdminAuthenticationException("Access forbidden: " + 
responseBody);
+        } else if (statusCode == 404) {
+          throw new PinotAdminNotFoundException("Resource not found: " + 
responseBody);
+        } else if (statusCode >= 400 && statusCode < 500) {
+          throw new PinotAdminValidationException("Client error (status: " + 
statusCode + "): " + responseBody);
+        } else {
+          throw new PinotAdminException("HTTP request failed with status: " + 
statusCode
+              + ", body: " + responseBody);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Failed to parse response", e);
+      throw new RuntimeException("Failed to parse response", e);

Review Comment:
   The error message 'Failed to parse response' is too generic and doesn't 
provide helpful debugging information. Consider including the response status 
code, response body excerpt, or the original exception type in the error 
message.
   ```suggestion
         int statusCode = -1;
         String responseBodyExcerpt = "";
         try {
           if (response != null) {
             statusCode = response.getStatusCode();
             String body = response.getResponseBody();
             if (body != null) {
               responseBodyExcerpt = body.length() > 200 ? body.substring(0, 
200) + "..." : body;
             }
           }
         } catch (Exception inner) {
           // Ignore, use defaults
         }
         LOGGER.warn("Failed to parse response (status: {}, body excerpt: 
'{}')", statusCode, responseBodyExcerpt, e);
         throw new RuntimeException(
             "Failed to parse response (status: " + statusCode + ", body 
excerpt: '" + responseBodyExcerpt +
             "', exception: " + e.getClass().getSimpleName() + ")", e);
   ```



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentAdminClient.java:
##########
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client for segment administration operations.
+ * Provides methods to manage and query Pinot segments.
+ */
+public class PinotSegmentAdminClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotSegmentAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotSegmentAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all segments for a table.
+   *
+   * @param tableName Name of the table
+   * @param excludeReplacedSegments Whether to exclude replaced segments
+   * @return List of segment names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listSegments(String tableName, boolean 
excludeReplacedSegments)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("excludeReplacedSegments", 
String.valueOf(excludeReplacedSegments));
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName, queryParams, _headers);
+    return _transport.parseStringArray(response, "segments");
+  }
+
+  /**
+   * Lists all segments for a table (including replaced segments).
+   *
+   * @param tableName Name of the table
+   * @return List of segment names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listSegments(String tableName)
+      throws PinotAdminException {
+    return listSegments(tableName, false);
+  }
+
+  /**
+   * Gets a map from server to segments hosted by the server for a table.
+   *
+   * @param tableName Name of the table
+   * @return Server to segments map
+   * @throws PinotAdminException If the request fails
+   */
+  public String getServerToSegmentsMap(String tableName)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/segments/" + tableName + 
"/servers", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Lists segment lineage for a table in chronologically sorted order.
+   *
+   * @param tableName Name of the table
+   * @return Segment lineage as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String listSegmentLineage(String tableName)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/segments/" + tableName + 
"/lineage", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets a map from segment to CRC of the segment (only for OFFLINE tables).
+   *
+   * @param tableName Name of the table
+   * @return Segment to CRC map
+   * @throws PinotAdminException If the request fails
+   */
+  public Map<String, String> getSegmentToCrcMap(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName + "/crc", null, _headers);
+    return 
PinotAdminTransport.getObjectMapper().convertValue(response.get("segmentCrcMap"),
 Map.class);
+  }
+
+  /**
+   * Gets the metadata for a specific segment.
+   *
+   * @param tableName Name of the table
+   * @param segmentName Name of the segment
+   * @param columns Specific columns to include (optional)
+   * @return Segment metadata
+   * @throws PinotAdminException If the request fails
+   */
+  public Map<String, Object> getSegmentMetadata(String tableName, String 
segmentName,
+      List<String> columns)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (columns != null && !columns.isEmpty()) {
+      queryParams.put("columns", String.join(",", columns));
+    }
+
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/segments/" + tableName + 
"/" + segmentName + "/metadata",
+            queryParams, _headers);
+    return PinotAdminTransport.getObjectMapper().convertValue(response, 
Map.class);
+  }
+
+  /**
+   * Resets a segment by disabling it, waiting for external view to stabilize, 
and enabling it again.
+   *
+   * @param tableNameWithType Table name with type suffix
+   * @param segmentName Name of the segment
+   * @param targetInstance Target instance to reset (optional)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String resetSegment(String tableNameWithType, String segmentName, 
String targetInstance)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (targetInstance != null) {
+      queryParams.put("targetInstance", targetInstance);
+    }
+
+    JsonNode response =
+        _transport.executePost(_controllerAddress, "/segments/" + 
tableNameWithType + "/" + segmentName + "/reset",
+            null, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Resets all segments or error segments only for a table.
+   *
+   * @param tableNameWithType Table name with type suffix
+   * @param errorSegmentsOnly Whether to reset only error segments
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String resetSegments(String tableNameWithType, boolean 
errorSegmentsOnly)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("errorSegmentsOnly", 
String.valueOf(errorSegmentsOnly));
+
+    JsonNode response = _transport.executePost(_controllerAddress, 
"/segments/" + tableNameWithType + "/reset",
+        null, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes a specific segment.
+   *
+   * @param tableName Name of the table
+   * @param segmentName Name of the segment
+   * @param retentionPeriod Retention period for the segment (optional)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteSegment(String tableName, String segmentName, String 
retentionPeriod)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (retentionPeriod != null) {
+      queryParams.put("retention", retentionPeriod);
+    }
+
+    JsonNode response = _transport.executeDelete(_controllerAddress, 
"/segments/" + tableName + "/" + segmentName,
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes multiple segments specified in query parameters or all segments 
if none specified.
+   *
+   * @param tableName Name of the table
+   * @param segmentNames Comma-separated list of segment names to delete 
(optional)
+   * @param retentionPeriod Retention period for the segments (optional)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteMultipleSegments(String tableName, String segmentNames,
+      String retentionPeriod)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (segmentNames != null) {
+      queryParams.put("segmentNames", segmentNames);
+    }
+    if (retentionPeriod != null) {
+      queryParams.put("retention", retentionPeriod);
+    }
+
+    JsonNode response = _transport.executeDelete(_controllerAddress, 
"/segments/" + tableName,
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes segments specified in JSON array payload.
+   *
+   * @param tableName Name of the table
+   * @param segmentDeleteRequest Segment delete request as JSON string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteSegments(String tableName, String segmentDeleteRequest)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePost(_controllerAddress, 
"/segments/" + tableName + "/delete",
+        segmentDeleteRequest, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Selects segments based on time range criteria.
+   *
+   * @param tableName Name of the table
+   * @param startTimestampMs Start timestamp in milliseconds (inclusive)
+   * @param endTimestampMs End timestamp in milliseconds (exclusive)
+   * @param excludeReplacedSegments Whether to exclude replaced segments
+   * @return Selected segments as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String selectSegments(String tableName, long startTimestampMs, long 
endTimestampMs,
+      boolean excludeReplacedSegments)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    queryParams.put("startTimestampMs", String.valueOf(startTimestampMs));
+    queryParams.put("endTimestampMs", String.valueOf(endTimestampMs));
+    queryParams.put("excludeReplacedSegments", 
String.valueOf(excludeReplacedSegments));
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName + "/select",
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets server metadata for all table segments.
+   *
+   * @param tableName Name of the table
+   * @return Server metadata as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getServerMetadata(String tableName)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/segments/" + tableName + 
"/metadata", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets a list of segments that are stale from servers hosting the table.
+   *
+   * @param tableNameWithType Table name with type suffix
+   * @return Stale segments response
+   * @throws PinotAdminException If the request fails
+   */
+  public String getStaleSegments(String tableNameWithType)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableNameWithType + "/isStale",
+        null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets the Zookeeper metadata for all table segments.
+   *
+   * @param tableName Name of the table
+   * @return Zookeeper metadata
+   * @throws PinotAdminException If the request fails
+   */
+  public Map<String, Map<String, String>> getZookeeperMetadata(String 
tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName + "/zkmetadata",
+        null, _headers);
+    return 
PinotAdminTransport.getObjectMapper().convertValue(response.get("zkMetadata"), 
Map.class);
+  }
+
+  /**
+   * Gets storage tier for all segments in the given table.
+   *
+   * @param tableName Name of the table
+   * @return Storage tiers as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getStorageTiers(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName + "/tiers", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets storage tiers for a specific segment.
+   *
+   * @param tableName Name of the table
+   * @param segmentName Name of the segment
+   * @param tableType Table type (OFFLINE or REALTIME)
+   * @return Storage tiers as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getSegmentStorageTiers(String tableName, String segmentName, 
String tableType)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("type", tableType);
+
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/segments/" + tableName + 
"/" + segmentName + "/tiers",
+            queryParams, _headers);
+    return response.toString();
+  }
+
+  // Async versions of key methods
+
+  /**
+   * Lists all segments for a table (async).
+   */
+  public CompletableFuture<List<String>> listSegmentsAsync(String tableName, 
boolean excludeReplacedSegments) {
+    Map<String, String> queryParams = Map.of("excludeReplacedSegments", 
String.valueOf(excludeReplacedSegments));
+
+    return _transport.executeGetAsync(_controllerAddress, "/segments/" + 
tableName, queryParams, _headers)
+        .thenApply(response -> _transport.parseStringArraySafe(response, 
"segments"));
+  }
+
+  /**
+   * Gets the metadata for a specific segment (async).
+   */
+  public CompletableFuture<Map<String, Object>> getSegmentMetadataAsync(String 
tableName, String segmentName,
+      List<String> columns) {
+    Map<String, String> queryParams = new HashMap<>();
+    if (columns != null && !columns.isEmpty()) {
+      queryParams.put("columns", String.join(",", columns));
+    }
+
+    return _transport.executeGetAsync(_controllerAddress, "/segments/" + 
tableName + "/" + segmentName + "/metadata",
+            queryParams, _headers)
+        .thenApply(response -> (Map<String, Object>) response);

Review Comment:
   Unsafe cast from JsonNode to Map<String, Object> without proper type 
checking. This will throw ClassCastException at runtime. Use 
ObjectMapper.convertValue() for safe conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to