Copilot commented on code in PR #17040:
URL: https://github.com/apache/pinot/pull/17040#discussion_r2442603298


##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotTableAdminClient.java:
##########
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client for table administration operations.
+ * Provides methods to create, update, delete, and manage Pinot tables.
+ */
+public class PinotTableAdminClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTableAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotTableAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all tables in the cluster.
+   *
+   * @param tableType Filter by table type (realtime, offline, dimension)
+   * @param taskType Filter by task type
+   * @param sortType Sort by (name, creationTime, lastModifiedTime)
+   * @return List of table names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listTables(String tableType, String taskType,
+      String sortType)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (tableType != null) {
+      queryParams.put("type", tableType);
+    }
+    if (taskType != null) {
+      queryParams.put("taskType", taskType);
+    }
+    if (sortType != null) {
+      queryParams.put("sortType", sortType);
+    }
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tables", 
queryParams, _headers);
+    return Arrays.asList(response.get("tables").asText().split(","));

Review Comment:
   Response parsing assumes a comma-separated string format, but the actual 
Pinot API returns a JSON array. This will fail when the response contains an 
array structure instead of a comma-separated string.
   ```suggestion
       JsonNode tablesNode = response.get("tables");
       if (tablesNode == null || !tablesNode.isArray()) {
         return java.util.Collections.emptyList();
       }
       java.util.List<String> tables = new java.util.ArrayList<>();
       for (JsonNode tableNode : tablesNode) {
         tables.add(tableNode.asText());
       }
       return tables;
   ```



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSchemaAdminClient.java:
##########
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client for schema administration operations.
+ * Provides methods to create, update, delete, and manage Pinot schemas.
+ */
+public class PinotSchemaAdminClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotSchemaAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotSchemaAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all schema names in the cluster.
+   *
+   * @return List of schema names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listSchemaNames()
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/schemas", 
null, _headers);
+    return Arrays.asList(response.get("schemas").asText().split(","));

Review Comment:
   Response parsing assumes comma-separated string but should handle JSON array 
format returned by the actual Pinot controller API.
   ```suggestion
       JsonNode schemasNode = response.get("schemas");
       if (schemasNode == null || !schemasNode.isArray()) {
         throw new PinotAdminException("Invalid response: 'schemas' field is 
missing or not an array");
       }
       List<String> schemaNames = new java.util.ArrayList<>();
       for (JsonNode schemaNode : schemasNode) {
         schemaNames.add(schemaNode.asText());
       }
       return schemaNames;
   ```



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java:
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import javax.net.ssl.SSLContext;
+import org.apache.pinot.client.utils.ConnectionUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.ClientStats;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
+import org.asynchttpclient.Dsl;
+import org.asynchttpclient.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * HTTP transport for Pinot admin operations.
+ * Handles communication with Pinot controller REST APIs.
+ */
+public class PinotAdminTransport implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminTransport.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Gets the ObjectMapper instance for JSON serialization/deserialization.
+   *
+   * @return ObjectMapper instance
+   */
+  public static ObjectMapper getObjectMapper() {
+    return OBJECT_MAPPER;
+  }
+
+  private final AsyncHttpClient _httpClient;
+  private final String _scheme;
+  private final Map<String, String> _defaultHeaders;
+  private final int _requestTimeoutMs;
+
+  public PinotAdminTransport(Properties properties, Map<String, String> 
authHeaders) {
+    _defaultHeaders = authHeaders != null ? authHeaders : Map.of();
+
+    // Extract timeout configuration
+    _requestTimeoutMs = 
Integer.parseInt(properties.getProperty("pinot.admin.request.timeout.ms", 
"60000"));
+
+    // Extract scheme (http/https)
+    String scheme = properties.getProperty("pinot.admin.scheme", 
CommonConstants.HTTP_PROTOCOL);
+    _scheme = scheme;
+
+    // Build HTTP client
+    Builder builder = Dsl.config()
+        .setRequestTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setReadTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setConnectTimeout(Duration.ofMillis(10000)) // 10 second connect 
timeout
+        .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", 
null));
+
+    // Configure SSL if needed
+    if (CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(scheme)) {
+      try {
+        SSLContext sslContext = SSLContext.getDefault();
+        builder.setSslContext(new 
io.netty.handler.ssl.JdkSslContext(sslContext, true,
+            io.netty.handler.ssl.ClientAuth.OPTIONAL));
+      } catch (Exception e) {
+        LOGGER.warn("Failed to configure SSL context, proceeding without SSL", 
e);
+      }
+    }
+
+    _httpClient = Dsl.asyncHttpClient(builder.build());
+    LOGGER.info("Initialized Pinot admin transport with scheme: {}, timeout: 
{}ms", scheme, _requestTimeoutMs);
+  }
+
+  /**
+   * Executes a GET request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executeGet(String controllerAddress, String path, 
Map<String, String> queryParams,
+      Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executeGetAsync(controllerAddress, path, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute GET request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async GET request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executeGetAsync(String controllerAddress, 
String path,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "GET", null, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute GET request to " + path, throwable);
+          throw new RuntimeException("Failed to execute GET request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a POST request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executePost(String controllerAddress, String path, Object 
body,
+      Map<String, String> queryParams, Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executePostAsync(controllerAddress, path, body, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute POST request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async POST request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executePostAsync(String 
controllerAddress, String path, Object body,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "POST", body, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute POST request to " + path, throwable);
+          throw new RuntimeException("Failed to execute POST request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a PUT request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executePut(String controllerAddress, String path, Object 
body,
+      Map<String, String> queryParams, Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executePutAsync(controllerAddress, path, body, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute PUT request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async PUT request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executePutAsync(String controllerAddress, 
String path, Object body,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "PUT", body, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute PUT request to " + path, throwable);
+          throw new RuntimeException("Failed to execute PUT request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a DELETE request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executeDelete(String controllerAddress, String path, 
Map<String, String> queryParams,
+      Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executeDeleteAsync(controllerAddress, path, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute DELETE request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async DELETE request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executeDeleteAsync(String 
controllerAddress, String path,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "DELETE", null, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute DELETE request to " + path, 
throwable);
+          throw new RuntimeException("Failed to execute DELETE request", 
throwable);
+        });
+  }
+
+  private CompletableFuture<Response> executeRequestAsync(String 
controllerAddress, String path, String method,
+      Object body, Map<String, String> queryParams, Map<String, String> 
headers) {
+    String url = buildUrl(controllerAddress, path, queryParams);
+
+    BoundRequestBuilder requestBuilder = _httpClient.prepare(method, url);
+
+    // Add default headers
+    for (Map.Entry<String, String> header : _defaultHeaders.entrySet()) {
+      requestBuilder.addHeader(header.getKey(), header.getValue());
+    }
+
+    // Add request-specific headers
+    if (headers != null) {
+      for (Map.Entry<String, String> header : headers.entrySet()) {
+        requestBuilder.addHeader(header.getKey(), header.getValue());
+      }
+    }
+
+    // Set content type for requests with body
+    if (body != null) {
+      String bodyStr = body instanceof String ? (String) body : toJson(body);
+      requestBuilder.setBody(bodyStr).addHeader("Content-Type", 
"application/json");
+    }
+
+    return requestBuilder.execute().toCompletableFuture();
+  }
+
+  private String buildUrl(String controllerAddress, String path, Map<String, 
String> queryParams) {
+    StringBuilder url = new 
StringBuilder(_scheme).append("://").append(controllerAddress).append(path);
+
+    if (queryParams != null && !queryParams.isEmpty()) {
+      url.append("?");
+      boolean first = true;
+      for (Map.Entry<String, String> param : queryParams.entrySet()) {
+        if (!first) {
+          url.append("&");
+        }
+        url.append(param.getKey()).append("=").append(param.getValue());

Review Comment:
   Query parameter values are not URL-encoded, which can lead to malformed URLs 
and potential injection vulnerabilities when parameter values contain special 
characters.



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentAdminClient.java:
##########
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client for segment administration operations.
+ * Provides methods to manage and query Pinot segments.
+ */
+public class PinotSegmentAdminClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotSegmentAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotSegmentAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all segments for a table.
+   *
+   * @param tableName Name of the table
+   * @param excludeReplacedSegments Whether to exclude replaced segments
+   * @return List of segment names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listSegments(String tableName, boolean 
excludeReplacedSegments)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("excludeReplacedSegments", 
String.valueOf(excludeReplacedSegments));
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName, queryParams, _headers);
+    return Arrays.asList(response.get("segments").asText().split(","));

Review Comment:
   Segments listing should parse JSON array response format rather than 
assuming comma-separated string format.



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java:
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import javax.net.ssl.SSLContext;
+import org.apache.pinot.client.utils.ConnectionUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.ClientStats;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
+import org.asynchttpclient.Dsl;
+import org.asynchttpclient.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * HTTP transport for Pinot admin operations.
+ * Handles communication with Pinot controller REST APIs.
+ */
+public class PinotAdminTransport implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminTransport.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Gets the ObjectMapper instance for JSON serialization/deserialization.
+   *
+   * @return ObjectMapper instance
+   */
+  public static ObjectMapper getObjectMapper() {
+    return OBJECT_MAPPER;
+  }
+
+  private final AsyncHttpClient _httpClient;
+  private final String _scheme;
+  private final Map<String, String> _defaultHeaders;
+  private final int _requestTimeoutMs;
+
+  public PinotAdminTransport(Properties properties, Map<String, String> 
authHeaders) {
+    _defaultHeaders = authHeaders != null ? authHeaders : Map.of();
+
+    // Extract timeout configuration
+    _requestTimeoutMs = 
Integer.parseInt(properties.getProperty("pinot.admin.request.timeout.ms", 
"60000"));
+
+    // Extract scheme (http/https)
+    String scheme = properties.getProperty("pinot.admin.scheme", 
CommonConstants.HTTP_PROTOCOL);
+    _scheme = scheme;
+
+    // Build HTTP client
+    Builder builder = Dsl.config()
+        .setRequestTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setReadTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setConnectTimeout(Duration.ofMillis(10000)) // 10 second connect 
timeout
+        .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", 
null));
+
+    // Configure SSL if needed
+    if (CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(scheme)) {
+      try {
+        SSLContext sslContext = SSLContext.getDefault();
+        builder.setSslContext(new 
io.netty.handler.ssl.JdkSslContext(sslContext, true,
+            io.netty.handler.ssl.ClientAuth.OPTIONAL));
+      } catch (Exception e) {
+        LOGGER.warn("Failed to configure SSL context, proceeding without SSL", 
e);
+      }
+    }
+
+    _httpClient = Dsl.asyncHttpClient(builder.build());
+    LOGGER.info("Initialized Pinot admin transport with scheme: {}, timeout: 
{}ms", scheme, _requestTimeoutMs);
+  }
+
+  /**
+   * Executes a GET request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executeGet(String controllerAddress, String path, 
Map<String, String> queryParams,
+      Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executeGetAsync(controllerAddress, path, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute GET request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async GET request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executeGetAsync(String controllerAddress, 
String path,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "GET", null, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute GET request to " + path, throwable);
+          throw new RuntimeException("Failed to execute GET request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a POST request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executePost(String controllerAddress, String path, Object 
body,
+      Map<String, String> queryParams, Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executePostAsync(controllerAddress, path, body, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute POST request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async POST request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executePostAsync(String 
controllerAddress, String path, Object body,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "POST", body, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute POST request to " + path, throwable);
+          throw new RuntimeException("Failed to execute POST request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a PUT request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executePut(String controllerAddress, String path, Object 
body,
+      Map<String, String> queryParams, Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executePutAsync(controllerAddress, path, body, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute PUT request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async PUT request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executePutAsync(String controllerAddress, 
String path, Object body,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "PUT", body, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute PUT request to " + path, throwable);
+          throw new RuntimeException("Failed to execute PUT request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a DELETE request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executeDelete(String controllerAddress, String path, 
Map<String, String> queryParams,
+      Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executeDeleteAsync(controllerAddress, path, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute DELETE request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async DELETE request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executeDeleteAsync(String 
controllerAddress, String path,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "DELETE", null, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute DELETE request to " + path, 
throwable);
+          throw new RuntimeException("Failed to execute DELETE request", 
throwable);
+        });
+  }
+
+  private CompletableFuture<Response> executeRequestAsync(String 
controllerAddress, String path, String method,
+      Object body, Map<String, String> queryParams, Map<String, String> 
headers) {
+    String url = buildUrl(controllerAddress, path, queryParams);
+
+    BoundRequestBuilder requestBuilder = _httpClient.prepare(method, url);
+
+    // Add default headers
+    for (Map.Entry<String, String> header : _defaultHeaders.entrySet()) {
+      requestBuilder.addHeader(header.getKey(), header.getValue());
+    }
+
+    // Add request-specific headers
+    if (headers != null) {
+      for (Map.Entry<String, String> header : headers.entrySet()) {
+        requestBuilder.addHeader(header.getKey(), header.getValue());
+      }
+    }
+
+    // Set content type for requests with body
+    if (body != null) {
+      String bodyStr = body instanceof String ? (String) body : toJson(body);
+      requestBuilder.setBody(bodyStr).addHeader("Content-Type", 
"application/json");
+    }
+
+    return requestBuilder.execute().toCompletableFuture();
+  }
+
+  private String buildUrl(String controllerAddress, String path, Map<String, 
String> queryParams) {
+    StringBuilder url = new 
StringBuilder(_scheme).append("://").append(controllerAddress).append(path);

Review Comment:
   URL construction is vulnerable to injection attacks. Query parameters should 
be URL-encoded to prevent injection of malicious content through parameter 
values.



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotInstanceAdminClient.java:
##########
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client for instance administration operations.
+ * Provides methods to create, update, delete, and manage Pinot instances.
+ */
+public class PinotInstanceAdminClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotInstanceAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotInstanceAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all instances in the cluster.
+   *
+   * @return List of instance names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listInstances()
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/instances", null, _headers);
+    return Arrays.asList(response.get("instances").asText().split(","));

Review Comment:
   Similar to table listing, this assumes comma-separated string format but the 
Pinot API typically returns JSON arrays. This parsing will fail with actual API 
responses.



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java:
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import javax.net.ssl.SSLContext;
+import org.apache.pinot.client.utils.ConnectionUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.ClientStats;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
+import org.asynchttpclient.Dsl;
+import org.asynchttpclient.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * HTTP transport for Pinot admin operations.
+ * Handles communication with Pinot controller REST APIs.
+ */
+public class PinotAdminTransport implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminTransport.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Gets the ObjectMapper instance for JSON serialization/deserialization.
+   *
+   * @return ObjectMapper instance
+   */
+  public static ObjectMapper getObjectMapper() {
+    return OBJECT_MAPPER;
+  }
+
+  private final AsyncHttpClient _httpClient;
+  private final String _scheme;
+  private final Map<String, String> _defaultHeaders;
+  private final int _requestTimeoutMs;
+
+  public PinotAdminTransport(Properties properties, Map<String, String> 
authHeaders) {
+    _defaultHeaders = authHeaders != null ? authHeaders : Map.of();
+
+    // Extract timeout configuration
+    _requestTimeoutMs = 
Integer.parseInt(properties.getProperty("pinot.admin.request.timeout.ms", 
"60000"));
+
+    // Extract scheme (http/https)
+    String scheme = properties.getProperty("pinot.admin.scheme", 
CommonConstants.HTTP_PROTOCOL);
+    _scheme = scheme;
+
+    // Build HTTP client
+    Builder builder = Dsl.config()
+        .setRequestTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setReadTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setConnectTimeout(Duration.ofMillis(10000)) // 10 second connect 
timeout
+        .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", 
null));
+
+    // Configure SSL if needed
+    if (CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(scheme)) {
+      try {
+        SSLContext sslContext = SSLContext.getDefault();
+        builder.setSslContext(new 
io.netty.handler.ssl.JdkSslContext(sslContext, true,
+            io.netty.handler.ssl.ClientAuth.OPTIONAL));

Review Comment:
   Direct instantiation of `JdkSslContext` bypasses proper SSL configuration. 
Consider using the builder pattern or factory methods provided by the Netty SSL 
framework for more robust SSL setup and better error handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to