This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d2629743eaa feat: Add comprehensive Pinot Admin Client (#17040)
d2629743eaa is described below

commit d2629743eaa8675e5464416529ccc24acb212c7f
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Oct 24 18:08:43 2025 -0700

    feat: Add comprehensive Pinot Admin Client (#17040)
    
    - Add PinotAdminClient with full controller API coverage
    - Implement 6 service clients: Table, Schema, Instance, Segment, Tenant, 
Task
    - Add authentication support (Basic, Bearer, Custom)
    - Include comprehensive error handling with specific exception types
    - Support both synchronous and asynchronous operations
    - Add complete documentation and usage examples
    - Fix build issues and ensure code quality compliance
    
    This provides a complete programmatic interface for Pinot cluster 
administration,
    eliminating the need for direct HTTP API calls to the controller.
---
 .../client/admin/PinotAdminAuthentication.java     | 139 +++++++
 .../admin/PinotAdminAuthenticationException.java   |  33 ++
 .../pinot/client/admin/PinotAdminClient.java       | 196 +++++++++
 .../client/admin/PinotAdminClientExample.java      | 143 +++++++
 .../pinot/client/admin/PinotAdminException.java    |  37 ++
 .../client/admin/PinotAdminNotFoundException.java  |  33 ++
 .../pinot/client/admin/PinotAdminTransport.java    | 449 +++++++++++++++++++++
 .../admin/PinotAdminValidationException.java       |  33 ++
 .../client/admin/PinotInstanceAdminClient.java     | 255 ++++++++++++
 .../pinot/client/admin/PinotSchemaAdminClient.java | 231 +++++++++++
 .../client/admin/PinotSegmentAdminClient.java      | 384 ++++++++++++++++++
 .../pinot/client/admin/PinotTableAdminClient.java  | 372 +++++++++++++++++
 .../pinot/client/admin/PinotTaskAdminClient.java   | 329 +++++++++++++++
 .../pinot/client/admin/PinotTenantAdminClient.java | 370 +++++++++++++++++
 .../java/org/apache/pinot/client/admin/README.md   | 336 +++++++++++++++
 .../pinot/client/admin/PinotAdminClientTest.java   | 148 +++++++
 16 files changed, 3488 insertions(+)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminAuthentication.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminAuthentication.java
new file mode 100644
index 00000000000..fc7f6c085ce
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminAuthentication.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for handling authentication in Pinot admin operations.
+ */
+public class PinotAdminAuthentication {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminAuthentication.class);
+
+  private PinotAdminAuthentication() {
+  }
+
+  public enum AuthType {
+    NONE,
+    BASIC,
+    BEARER,
+    CUSTOM
+  }
+
+  /**
+   * Creates authentication headers for basic authentication.
+   *
+   * @param username Username
+   * @param password Password
+   * @return Authentication headers
+   */
+  public static Map<String, String> createBasicAuthHeaders(String username, 
String password) {
+    String authString = username + ":" + password;
+    String encodedAuth = 
Base64.getEncoder().encodeToString(authString.getBytes());
+    Map<String, String> headers = new HashMap<>();
+    headers.put("Authorization", "Basic " + encodedAuth);
+    return headers;
+  }
+
+  /**
+   * Creates authentication headers for bearer token authentication.
+   *
+   * @param token Bearer token
+   * @return Authentication headers
+   */
+  public static Map<String, String> createBearerAuthHeaders(String token) {
+    Map<String, String> headers = new HashMap<>();
+    headers.put("Authorization", "Bearer " + token);
+    LOGGER.debug("Created bearer authentication headers");
+    return headers;
+  }
+
+  /**
+   * Creates custom authentication headers.
+   *
+   * @param headers Custom authentication headers
+   * @return Authentication headers
+   */
+  public static Map<String, String> createCustomAuthHeaders(Map<String, 
String> headers) {
+    LOGGER.debug("Created custom authentication headers");
+    return new HashMap<>(headers);
+  }
+
+  /**
+   * Validates authentication configuration.
+   *
+   * @param authType Authentication type
+   * @param authConfig Authentication configuration
+   * @throws PinotAdminAuthenticationException If authentication configuration 
is invalid
+   */
+  public static void validateAuthConfig(AuthType authType, Map<String, String> 
authConfig)
+      throws PinotAdminAuthenticationException {
+    switch (authType) {
+      case BASIC:
+        if (!authConfig.containsKey("username") || 
!authConfig.containsKey("password")) {
+          throw new PinotAdminAuthenticationException("Basic authentication 
requires username and password");
+        }
+        break;
+      case BEARER:
+        if (!authConfig.containsKey("token")) {
+          throw new PinotAdminAuthenticationException("Bearer authentication 
requires token");
+        }
+        break;
+      case CUSTOM:
+        if (authConfig.isEmpty()) {
+          throw new PinotAdminAuthenticationException("Custom authentication 
requires at least one header");
+        }
+        break;
+      case NONE:
+      default:
+        // No validation needed for none auth type
+        break;
+    }
+  }
+
+  /**
+   * Creates authentication headers based on the specified type and 
configuration.
+   *
+   * @param authType Authentication type
+   * @param authConfig Authentication configuration
+   * @return Authentication headers
+   * @throws PinotAdminAuthenticationException If authentication configuration 
is invalid
+   */
+  public static Map<String, String> createAuthHeaders(AuthType authType, 
Map<String, String> authConfig)
+      throws PinotAdminAuthenticationException {
+    validateAuthConfig(authType, authConfig);
+
+    switch (authType) {
+      case BASIC:
+        return createBasicAuthHeaders(authConfig.get("username"), 
authConfig.get("password"));
+      case BEARER:
+        return createBearerAuthHeaders(authConfig.get("token"));
+      case CUSTOM:
+        return createCustomAuthHeaders(authConfig);
+      case NONE:
+      default:
+        return Map.of();
+    }
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminAuthenticationException.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminAuthenticationException.java
new file mode 100644
index 00000000000..3d7279c77e1
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminAuthenticationException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+/**
+ * Exception thrown when authentication fails for admin operations.
+ */
+public class PinotAdminAuthenticationException extends PinotAdminException {
+
+  public PinotAdminAuthenticationException(String message) {
+    super(message);
+  }
+
+  public PinotAdminAuthenticationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
new file mode 100644
index 00000000000..701040111f4
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.client.PinotClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Main admin client for Pinot controller operations.
+ * Provides access to all administrative APIs for managing Pinot clusters.
+ */
+public class PinotAdminClient implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  // Service clients
+  private PinotTableAdminClient _tableClient;
+  private PinotSchemaAdminClient _schemaClient;
+  private PinotInstanceAdminClient _instanceClient;
+  private PinotSegmentAdminClient _segmentClient;
+  private PinotTenantAdminClient _tenantClient;
+  private PinotTaskAdminClient _taskClient;
+
+  /**
+   * Creates a PinotAdminClient with the specified controller address.
+   *
+   * @param controllerAddress The address of the Pinot controller (e.g., 
"localhost:9000")
+   * @throws PinotClientException If the client cannot be initialized
+   */
+  public PinotAdminClient(String controllerAddress)
+      throws PinotClientException {
+    this(controllerAddress, new Properties());
+  }
+
+  /**
+   * Creates a PinotAdminClient with the specified controller address and 
properties.
+   *
+   * @param controllerAddress The address of the Pinot controller (e.g., 
"localhost:9000")
+   * @param properties Configuration properties for the client
+   * @throws PinotClientException If the client cannot be initialized
+   */
+  public PinotAdminClient(String controllerAddress, Properties properties)
+      throws PinotClientException {
+    this(controllerAddress, properties, null);
+  }
+
+  /**
+   * Creates a PinotAdminClient with the specified controller address, 
properties, and authentication headers.
+   *
+   * @param controllerAddress The address of the Pinot controller (e.g., 
"localhost:9000")
+   * @param properties Configuration properties for the client
+   * @param authHeaders Authentication headers for admin operations
+   * @throws PinotClientException If the client cannot be initialized
+   */
+  public PinotAdminClient(String controllerAddress, Properties properties, 
Map<String, String> authHeaders)
+      throws PinotClientException {
+    _controllerAddress = controllerAddress;
+    _transport = new PinotAdminTransport(properties, authHeaders);
+    _headers = authHeaders != null ? authHeaders : Map.of();
+    LOGGER.info("Created Pinot admin client for controller at {}", 
controllerAddress);
+  }
+
+  /**
+   * Creates a PinotAdminClient with authentication configuration.
+   *
+   * @param controllerAddress The address of the Pinot controller (e.g., 
"localhost:9000")
+   * @param properties Configuration properties for the client
+   * @param authType Authentication type
+   * @param authConfig Authentication configuration
+   * @throws PinotClientException If the client cannot be initialized
+   * @throws PinotAdminAuthenticationException If authentication configuration 
is invalid
+   */
+  public PinotAdminClient(String controllerAddress, Properties properties,
+      PinotAdminAuthentication.AuthType authType, Map<String, String> 
authConfig)
+      throws PinotClientException, PinotAdminAuthenticationException {
+    _controllerAddress = controllerAddress;
+    Map<String, String> authHeaders = 
PinotAdminAuthentication.createAuthHeaders(authType, authConfig);
+    _transport = new PinotAdminTransport(properties, authHeaders);
+    _headers = authHeaders;
+    LOGGER.info("Created Pinot admin client for controller at {} with {} 
authentication",
+        controllerAddress, authType);
+  }
+
+  // Package-private constructor for tests to inject a mocked transport
+  PinotAdminClient(String controllerAddress, PinotAdminTransport transport, 
Map<String, String> headers) {
+    _controllerAddress = controllerAddress;
+    _transport = transport;
+    _headers = headers != null ? headers : Map.of();
+  }
+
+  /**
+   * Gets the table administration client.
+   *
+   * @return Table administration operations
+   */
+  public PinotTableAdminClient getTableClient() {
+    if (_tableClient == null) {
+      _tableClient = new PinotTableAdminClient(_transport, _controllerAddress, 
_headers);
+    }
+    return _tableClient;
+  }
+
+  /**
+   * Gets the schema administration client.
+   *
+   * @return Schema administration operations
+   */
+  public PinotSchemaAdminClient getSchemaClient() {
+    if (_schemaClient == null) {
+      _schemaClient = new PinotSchemaAdminClient(_transport, 
_controllerAddress, _headers);
+    }
+    return _schemaClient;
+  }
+
+  /**
+   * Gets the instance administration client.
+   *
+   * @return Instance administration operations
+   */
+  public PinotInstanceAdminClient getInstanceClient() {
+    if (_instanceClient == null) {
+      _instanceClient = new PinotInstanceAdminClient(_transport, 
_controllerAddress, _headers);
+    }
+    return _instanceClient;
+  }
+
+  /**
+   * Gets the segment administration client.
+   *
+   * @return Segment administration operations
+   */
+  public PinotSegmentAdminClient getSegmentClient() {
+    if (_segmentClient == null) {
+      _segmentClient = new PinotSegmentAdminClient(_transport, 
_controllerAddress, _headers);
+    }
+    return _segmentClient;
+  }
+
+  /**
+   * Gets the tenant administration client.
+   *
+   * @return Tenant administration operations
+   */
+  public PinotTenantAdminClient getTenantClient() {
+    if (_tenantClient == null) {
+      _tenantClient = new PinotTenantAdminClient(_transport, 
_controllerAddress, _headers);
+    }
+    return _tenantClient;
+  }
+
+  /**
+   * Gets the task administration client.
+   *
+   * @return Task administration operations
+   */
+  public PinotTaskAdminClient getTaskClient() {
+    if (_taskClient == null) {
+      _taskClient = new PinotTaskAdminClient(_transport, _controllerAddress, 
_headers);
+    }
+    return _taskClient;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    try {
+      _transport.close();
+    } catch (PinotClientException e) {
+      throw new IOException("Failed to close admin client transport", e);
+    }
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
new file mode 100644
index 00000000000..3f567a0a5e2
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Example demonstrating how to use PinotAdminClient.
+ */
+public class PinotAdminClientExample {
+  private PinotAdminClientExample() {
+  }
+
+  public static void main(String[] args) {
+    // Example 1: Basic usage without authentication
+    try (PinotAdminClient adminClient = new 
PinotAdminClient("localhost:9000")) {
+      exampleBasicUsage(adminClient);
+    } catch (Exception e) {
+      System.err.println("Error in basic usage example: " + e.getMessage());
+    }
+
+    // Example 2: Usage with basic authentication
+    try {
+      Properties properties = new Properties();
+      properties.setProperty("pinot.admin.request.timeout.ms", "30000");
+
+      PinotAdminClient adminClient = new PinotAdminClient("localhost:9000", 
properties,
+          PinotAdminAuthentication.AuthType.BASIC,
+          Map.of("username", "admin", "password", "password"));
+
+      exampleWithAuthentication(adminClient);
+      adminClient.close();
+    } catch (Exception e) {
+      System.err.println("Error in authentication example: " + e.getMessage());
+    }
+
+    // Example 3: Usage with bearer token authentication
+    try {
+      Properties properties = new Properties();
+      PinotAdminClient adminClient = new PinotAdminClient("localhost:9000", 
properties,
+          PinotAdminAuthentication.AuthType.BEARER,
+          Map.of("token", "your-bearer-token"));
+
+      exampleWithBearerAuth(adminClient);
+      adminClient.close();
+    } catch (Exception e) {
+      System.err.println("Error in bearer auth example: " + e.getMessage());
+    }
+  }
+
+  private static void exampleBasicUsage(PinotAdminClient adminClient)
+      throws PinotAdminException {
+    System.out.println("=== Basic Usage Example ===");
+
+    try {
+      // List tables
+      var tables = adminClient.getTableClient().listTables(null, null, null);
+      System.out.println("Tables: " + tables);
+
+      // List schemas
+      var schemas = adminClient.getSchemaClient().listSchemaNames();
+      System.out.println("Schemas: " + schemas);
+
+      // List instances
+      var instances = adminClient.getInstanceClient().listInstances();
+      System.out.println("Instances: " + instances);
+
+      // List tenants
+      var tenants = adminClient.getTenantClient().listTenants();
+      System.out.println("Tenants: " + tenants);
+
+      // List task types
+      var taskTypes = adminClient.getTaskClient().listTaskTypes();
+      System.out.println("Task types: " + taskTypes);
+    } catch (PinotAdminException e) {
+      System.out.println("Admin operation failed: " + e.getMessage());
+    }
+  }
+
+  private static void exampleWithAuthentication(PinotAdminClient adminClient)
+      throws PinotAdminException {
+    System.out.println("=== Authentication Example ===");
+
+    try {
+      // Get a specific table configuration
+      String tableConfig = 
adminClient.getTableClient().getTableConfig("myTable");
+      System.out.println("Table config: " + tableConfig);
+
+      // Validate a schema
+      String schemaConfig =
+          
"{\"schemaName\":\"testSchema\",\"dimensionFieldSpecs\":[{\"name\":\"id\",\"dataType\":\"INT\"}]}";
+      String validationResult = 
adminClient.getSchemaClient().validateSchema(schemaConfig);
+      System.out.println("Schema validation: " + validationResult);
+    } catch (PinotAdminAuthenticationException e) {
+      System.out.println("Authentication failed: " + e.getMessage());
+    } catch (PinotAdminNotFoundException e) {
+      System.out.println("Resource not found: " + e.getMessage());
+    } catch (PinotAdminException e) {
+      System.out.println("Admin operation failed: " + e.getMessage());
+    }
+  }
+
+  private static void exampleWithBearerAuth(PinotAdminClient adminClient)
+      throws PinotAdminException {
+    System.out.println("=== Bearer Authentication Example ===");
+
+    try {
+      // Create a new schema
+      String schemaConfig =
+          
"{\"schemaName\":\"exampleSchema\",\"dimensionFieldSpecs\":[{\"name\":\"id\",\"dataType\":\"INT\"}]}";
+      String createResult = 
adminClient.getSchemaClient().createSchema(schemaConfig);
+      System.out.println("Schema creation: " + createResult);
+
+      // Get instance information
+      var liveInstances = adminClient.getInstanceClient().listLiveInstances();
+      System.out.println("Live instances: " + liveInstances);
+    } catch (PinotAdminAuthenticationException e) {
+      System.out.println("Authentication failed: " + e.getMessage());
+    } catch (PinotAdminValidationException e) {
+      System.out.println("Validation failed: " + e.getMessage());
+    } catch (PinotAdminException e) {
+      System.out.println("Admin operation failed: " + e.getMessage());
+    }
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminException.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminException.java
new file mode 100644
index 00000000000..cf767c02d29
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+/**
+ * Exception thrown by Pinot admin operations.
+ */
+public class PinotAdminException extends Exception {
+
+  public PinotAdminException(String message) {
+    super(message);
+  }
+
+  public PinotAdminException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public PinotAdminException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminNotFoundException.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminNotFoundException.java
new file mode 100644
index 00000000000..2173f55241d
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminNotFoundException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+/**
+ * Exception thrown when a requested resource is not found.
+ */
+public class PinotAdminNotFoundException extends PinotAdminException {
+
+  public PinotAdminNotFoundException(String message) {
+    super(message);
+  }
+
+  public PinotAdminNotFoundException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
new file mode 100644
index 00000000000..5b2ab6cbc88
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import javax.net.ssl.SSLContext;
+import org.apache.pinot.client.utils.ConnectionUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.ClientStats;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
+import org.asynchttpclient.Dsl;
+import org.asynchttpclient.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * HTTP transport for Pinot admin operations.
+ * Handles communication with Pinot controller REST APIs.
+ */
+public class PinotAdminTransport implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminTransport.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Gets the ObjectMapper instance for JSON serialization/deserialization.
+   *
+   * @return ObjectMapper instance
+   */
+  public static ObjectMapper getObjectMapper() {
+    return OBJECT_MAPPER;
+  }
+
+  private final AsyncHttpClient _httpClient;
+  private final String _scheme;
+  private final Map<String, String> _defaultHeaders;
+  private final int _requestTimeoutMs;
+
+  public PinotAdminTransport(Properties properties, Map<String, String> 
authHeaders) {
+    _defaultHeaders = authHeaders != null ? authHeaders : Map.of();
+
+    // Extract timeout configuration
+    _requestTimeoutMs = 
Integer.parseInt(properties.getProperty("pinot.admin.request.timeout.ms", 
"60000"));
+
+    // Extract scheme (http/https)
+    String scheme = properties.getProperty("pinot.admin.scheme", 
CommonConstants.HTTP_PROTOCOL);
+    _scheme = scheme;
+
+    // Build HTTP client
+    Builder builder = Dsl.config()
+        .setRequestTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setReadTimeout(Duration.ofMillis(_requestTimeoutMs))
+        .setConnectTimeout(Duration.ofMillis(10000)) // 10 second connect 
timeout
+        .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", 
null));
+
+    // Configure SSL if needed
+    if (CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(scheme)) {
+      try {
+        SSLContext sslContext = SSLContext.getDefault();
+        builder.setSslContext(new 
io.netty.handler.ssl.JdkSslContext(sslContext, true,
+            io.netty.handler.ssl.ClientAuth.OPTIONAL));
+      } catch (Exception e) {
+        LOGGER.warn("Failed to configure SSL context, proceeding without SSL", 
e);
+      }
+    }
+
+    _httpClient = Dsl.asyncHttpClient(builder.build());
+    LOGGER.info("Initialized Pinot admin transport with scheme: {}, timeout: 
{}ms", scheme, _requestTimeoutMs);
+  }
+
+  /**
+   * Executes a GET request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executeGet(String controllerAddress, String path, 
Map<String, String> queryParams,
+      Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executeGetAsync(controllerAddress, path, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute GET request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async GET request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executeGetAsync(String controllerAddress, 
String path,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "GET", null, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute GET request to " + path, throwable);
+          throw new RuntimeException("Failed to execute GET request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a POST request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executePost(String controllerAddress, String path, Object 
body,
+      Map<String, String> queryParams, Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executePostAsync(controllerAddress, path, body, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute POST request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async POST request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executePostAsync(String 
controllerAddress, String path, Object body,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "POST", body, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute POST request to " + path, throwable);
+          throw new RuntimeException("Failed to execute POST request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a PUT request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executePut(String controllerAddress, String path, Object 
body,
+      Map<String, String> queryParams, Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executePutAsync(controllerAddress, path, body, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute PUT request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async PUT request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param body Request body
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executePutAsync(String controllerAddress, 
String path, Object body,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "PUT", body, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute PUT request to " + path, throwable);
+          throw new RuntimeException("Failed to execute PUT request", 
throwable);
+        });
+  }
+
+  /**
+   * Executes a DELETE request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return Response JSON node
+   * @throws PinotAdminException If the request fails
+   */
+  public JsonNode executeDelete(String controllerAddress, String path, 
Map<String, String> queryParams,
+      Map<String, String> headers)
+      throws PinotAdminException {
+    try {
+      return executeDeleteAsync(controllerAddress, path, queryParams, 
headers).get(_requestTimeoutMs,
+          java.util.concurrent.TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new PinotAdminException("Failed to execute DELETE request to " + 
path, e);
+    }
+  }
+
+  /**
+   * Executes an async DELETE request to the specified path.
+   *
+   * @param controllerAddress Controller address
+   * @param path Request path
+   * @param queryParams Query parameters
+   * @param headers Additional headers
+   * @return CompletableFuture with response JSON node
+   */
+  public CompletableFuture<JsonNode> executeDeleteAsync(String 
controllerAddress, String path,
+      Map<String, String> queryParams, Map<String, String> headers) {
+    return executeRequestAsync(controllerAddress, path, "DELETE", null, 
queryParams, headers)
+        .thenApply(this::parseResponse)
+        .exceptionally(throwable -> {
+          LOGGER.error("Failed to execute DELETE request to " + path, 
throwable);
+          throw new RuntimeException("Failed to execute DELETE request", 
throwable);
+        });
+  }
+
+  private CompletableFuture<Response> executeRequestAsync(String 
controllerAddress, String path, String method,
+      Object body, Map<String, String> queryParams, Map<String, String> 
headers) {
+    String url = buildUrl(controllerAddress, path, queryParams);
+
+    BoundRequestBuilder requestBuilder = _httpClient.prepare(method, url);
+
+    // Add default headers
+    for (Map.Entry<String, String> header : _defaultHeaders.entrySet()) {
+      requestBuilder.addHeader(header.getKey(), header.getValue());
+    }
+
+    // Add request-specific headers
+    if (headers != null) {
+      for (Map.Entry<String, String> header : headers.entrySet()) {
+        requestBuilder.addHeader(header.getKey(), header.getValue());
+      }
+    }
+
+    // Set content type for requests with body
+    if (body != null) {
+      String bodyStr = body instanceof String ? (String) body : toJson(body);
+      requestBuilder.setBody(bodyStr).addHeader("Content-Type", 
"application/json");
+    }
+
+    return requestBuilder.execute().toCompletableFuture();
+  }
+
+  String buildUrl(String controllerAddress, String path, Map<String, String> 
queryParams) {
+    StringBuilder url = new 
StringBuilder(_scheme).append("://").append(controllerAddress).append(path);
+
+    if (queryParams != null && !queryParams.isEmpty()) {
+      url.append("?");
+      boolean first = true;
+      for (Map.Entry<String, String> param : queryParams.entrySet()) {
+        if (!first) {
+          url.append("&");
+        }
+        url.append(URLEncoder.encode(param.getKey(), 
StandardCharsets.UTF_8)).append("=");
+        if (param.getValue() != null) {
+          url.append(URLEncoder.encode(param.getValue(), 
StandardCharsets.UTF_8));
+        }
+        first = false;
+      }
+    }
+
+    return url.toString();
+  }
+
+  private String toJson(Object obj) {
+    try {
+      return OBJECT_MAPPER.writeValueAsString(obj);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to serialize object to JSON", e);
+    }
+  }
+
+  /**
+   * Parses a JSON array field into a List of Strings.
+   * Handles both actual JSON arrays and comma-separated strings for backward 
compatibility.
+   *
+   * @param response JSON response node
+   * @param fieldName Name of the field containing the array
+   * @return List of strings from the array field
+   * @throws PinotAdminException If the field is missing, null, or not in 
expected format
+   */
+  public List<String> parseStringArray(JsonNode response, String fieldName)
+      throws PinotAdminException {
+    JsonNode arrayNode = response.get(fieldName);
+    if (arrayNode == null) {
+      throw new PinotAdminException("Response missing '" + fieldName + "' 
field");
+    }
+
+    if (arrayNode.isArray()) {
+      // Handle JSON array format
+      java.util.List<String> result = new java.util.ArrayList<>();
+      for (JsonNode element : arrayNode) {
+        result.add(element.asText());
+      }
+      return result;
+    } else if (arrayNode.isTextual()) {
+      // Handle comma-separated string format for backward compatibility
+      String text = arrayNode.asText().trim();
+      if (text.isEmpty()) {
+        return java.util.Collections.emptyList();
+      }
+      return java.util.Arrays.asList(text.split(","));
+    } else {
+      throw new PinotAdminException("Field '" + fieldName + "' is not an array 
or string: " + arrayNode.getNodeType());
+    }
+  }
+
+  /**
+   * Safely parses a JSON array field into a List of Strings for async 
operations.
+   * Returns empty list on error instead of throwing exception.
+   *
+   * @param response JSON response node
+   * @param fieldName Name of the field containing the array
+   * @return List of strings from the array field, or empty list if parsing 
fails
+   */
+  public List<String> parseStringArraySafe(JsonNode response, String 
fieldName) {
+    try {
+      return parseStringArray(response, fieldName);
+    } catch (PinotAdminException e) {
+      LOGGER.warn("Failed to parse string array for field '{}': {}", 
fieldName, e.getMessage());
+      return java.util.Collections.emptyList();
+    }
+  }
+
+  private JsonNode parseResponse(Response response) {
+    try {
+      int statusCode = response.getStatusCode();
+      String responseBody = response.getResponseBody();
+
+      if (statusCode >= 200 && statusCode < 300) {
+        if (responseBody == null || responseBody.trim().isEmpty()) {
+          return OBJECT_MAPPER.createObjectNode();
+        }
+        return OBJECT_MAPPER.readTree(responseBody);
+      } else {
+        // Handle specific error cases
+        if (statusCode == 401) {
+          throw new PinotAdminAuthenticationException("Authentication failed: 
" + responseBody);
+        } else if (statusCode == 403) {
+          throw new PinotAdminAuthenticationException("Access forbidden: " + 
responseBody);
+        } else if (statusCode == 404) {
+          throw new PinotAdminNotFoundException("Resource not found: " + 
responseBody);
+        } else if (statusCode >= 400 && statusCode < 500) {
+          throw new PinotAdminValidationException("Client error (status: " + 
statusCode + "): " + responseBody);
+        } else {
+          throw new PinotAdminException("HTTP request failed with status: " + 
statusCode
+              + ", body: " + responseBody);
+        }
+      }
+    } catch (Exception e) {
+      int statusCode = -1;
+      String responseBodyExcerpt = "";
+      try {
+        if (response != null) {
+          statusCode = response.getStatusCode();
+          String body = response.getResponseBody();
+          if (body != null) {
+            responseBodyExcerpt = body.length() > 200 ? body.substring(0, 200) 
+ "..." : body;
+          }
+        }
+      } catch (Exception inner) {
+        // Ignore, use defaults
+      }
+      LOGGER.warn("Failed to parse response (status: {}, body excerpt: '{}')", 
statusCode, responseBodyExcerpt, e);
+      throw new RuntimeException(
+          "Failed to parse response (status: " + statusCode + ", body excerpt: 
'" + responseBodyExcerpt
+              + "', exception: " + e.getClass().getSimpleName() + ")",
+          e);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    if (_httpClient != null) {
+      try {
+        _httpClient.close();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to close HTTP client", e);
+      }
+    }
+  }
+
+  /**
+   * Gets the HTTP client statistics.
+   *
+   * @return Client statistics
+   */
+  public ClientStats getClientMetrics() {
+    return _httpClient.getClientStats();
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminValidationException.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminValidationException.java
new file mode 100644
index 00000000000..44e8f8698e5
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminValidationException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+/**
+ * Exception thrown when validation fails for admin operations.
+ */
+public class PinotAdminValidationException extends PinotAdminException {
+
+  public PinotAdminValidationException(String message) {
+    super(message);
+  }
+
+  public PinotAdminValidationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotInstanceAdminClient.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotInstanceAdminClient.java
new file mode 100644
index 00000000000..e5557c2a0a6
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotInstanceAdminClient.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client for instance administration operations.
+ * Provides methods to create, update, delete, and manage Pinot instances.
+ */
+public class PinotInstanceAdminClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotInstanceAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotInstanceAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all instances in the cluster.
+   *
+   * @return List of instance names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listInstances()
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/instances", null, _headers);
+    return _transport.parseStringArray(response, "instances");
+  }
+
+  /**
+   * Lists all live instances in the cluster.
+   *
+   * @return List of live instance names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listLiveInstances()
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/liveinstances", null, _headers);
+    return _transport.parseStringArray(response, "liveInstances");
+  }
+
+  /**
+   * Gets information about a specific instance.
+   *
+   * @param instanceName Name of the instance
+   * @return Instance information as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getInstance(String instanceName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/instances/" + instanceName, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Creates a new instance.
+   *
+   * @param instanceConfig Instance configuration as JSON string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String createInstance(String instanceConfig)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePost(_controllerAddress, 
"/instances", instanceConfig, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Enables or disables an instance.
+   *
+   * @param instanceName Name of the instance
+   * @param enabled Whether to enable or disable the instance
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String setInstanceState(String instanceName, boolean enabled)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePut(_controllerAddress, 
"/instances/" + instanceName + "/state",
+        enabled ? "enable" : "disable", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Enables, disables, or drops an instance.
+   *
+   * @param instanceName Name of the instance
+   * @param state State to set (enable, disable, or drop)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String setInstanceState(String instanceName, String state)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePost(_controllerAddress, 
"/instances/" + instanceName + "/state",
+        state, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Drops (deletes) an instance.
+   *
+   * @param instanceName Name of the instance to drop
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String dropInstance(String instanceName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeDelete(_controllerAddress, 
"/instances/" + instanceName, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Updates an existing instance.
+   *
+   * @param instanceName Name of the instance to update
+   * @param instanceConfig New instance configuration as JSON string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String updateInstance(String instanceName, String instanceConfig)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePut(_controllerAddress, 
"/instances/" + instanceName, instanceConfig,
+        null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Updates the tags of a specific instance.
+   *
+   * @param instanceName Name of the instance
+   * @param tagUpdateRequest Tag update request as JSON string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String updateInstanceTags(String instanceName, String 
tagUpdateRequest)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePut(_controllerAddress, 
"/instances/" + instanceName + "/updateTags",
+        tagUpdateRequest, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Updates the tables served by a broker instance in the broker resource.
+   *
+   * @param instanceName Name of the broker instance
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String updateBrokerResource(String instanceName)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executePost(_controllerAddress, "/instances/" + 
instanceName + "/updateBrokerResource",
+            null, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Validates whether it's safe to drop the given instances.
+   *
+   * @param instanceNames Comma-separated list of instance names to validate
+   * @return Validation response as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String validateDropInstances(String instanceNames)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("instanceNames", instanceNames);
+
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/instances/dropInstance/validate",
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Validates whether it's safe to update the tags of the given instances.
+   *
+   * @param instanceNames Comma-separated list of instance names to validate
+   * @param newTags New tags to assign
+   * @return Validation response as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String validateUpdateInstanceTags(String instanceNames, String 
newTags)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    queryParams.put("instanceNames", instanceNames);
+    queryParams.put("newTags", newTags);
+
+    JsonNode response = _transport.executePost(_controllerAddress, 
"/instances/updateTags/validate",
+        null, queryParams, _headers);
+    return response.toString();
+  }
+
+  // Async versions of key methods
+
+  /**
+   * Lists all instances in the cluster (async).
+   */
+  public CompletableFuture<List<String>> listInstancesAsync() {
+    return _transport.executeGetAsync(_controllerAddress, "/instances", null, 
_headers)
+        .thenApply(response -> _transport.parseStringArraySafe(response, 
"instances"));
+  }
+
+  /**
+   * Gets information about a specific instance (async).
+   */
+  public CompletableFuture<String> getInstanceAsync(String instanceName) {
+    return _transport.executeGetAsync(_controllerAddress, "/instances/" + 
instanceName, null, _headers)
+        .thenApply(JsonNode::toString);
+  }
+
+  /**
+   * Creates a new instance (async).
+   */
+  public CompletableFuture<String> createInstanceAsync(String instanceConfig) {
+    return _transport.executePostAsync(_controllerAddress, "/instances", 
instanceConfig, null, _headers)
+        .thenApply(JsonNode::toString);
+  }
+
+  /**
+   * Enables or disables an instance (async).
+   */
+  public CompletableFuture<String> setInstanceStateAsync(String instanceName, 
boolean enabled) {
+    return _transport.executePutAsync(_controllerAddress, "/instances/" + 
instanceName + "/state",
+            enabled ? "enable" : "disable", null, _headers)
+        .thenApply(JsonNode::toString);
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSchemaAdminClient.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSchemaAdminClient.java
new file mode 100644
index 00000000000..508d27191ca
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSchemaAdminClient.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client for schema administration operations.
+ * Provides methods to create, update, delete, and manage Pinot schemas.
+ */
+public class PinotSchemaAdminClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotSchemaAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotSchemaAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all schema names in the cluster.
+   *
+   * @return List of schema names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listSchemaNames()
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/schemas", 
null, _headers);
+    return _transport.parseStringArray(response, "schemas");
+  }
+
+  /**
+   * Gets information about all schemas including field count details.
+   *
+   * @return Schema information as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getSchemasInfo()
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/schemas/info", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets a specific schema by name.
+   *
+   * @param schemaName Name of the schema
+   * @return Schema configuration as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getSchema(String schemaName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/schemas/" 
+ schemaName, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes a schema by name.
+   *
+   * @param schemaName Name of the schema to delete
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteSchema(String schemaName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeDelete(_controllerAddress, 
"/schemas/" + schemaName, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Updates an existing schema.
+   *
+   * @param schemaName Name of the schema to update
+   * @param schemaConfig New schema configuration as JSON string
+   * @param reloadTables Whether to reload tables using this schema
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String updateSchema(String schemaName, String schemaConfig, boolean 
reloadTables)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("reloadTables", 
String.valueOf(reloadTables));
+
+    JsonNode response = _transport.executePut(_controllerAddress, "/schemas/" 
+ schemaName, schemaConfig,
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Updates an existing schema (without reloading tables).
+   *
+   * @param schemaName Name of the schema to update
+   * @param schemaConfig New schema configuration as JSON string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String updateSchema(String schemaName, String schemaConfig)
+      throws PinotAdminException {
+    return updateSchema(schemaName, schemaConfig, false);
+  }
+
+  /**
+   * Creates a new schema.
+   *
+   * @param schemaConfig Schema configuration as JSON string
+   * @param force Whether to force creation even if schema exists
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String createSchema(String schemaConfig, boolean force)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("force", String.valueOf(force));
+
+    JsonNode response = _transport.executePost(_controllerAddress, "/schemas", 
schemaConfig, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Creates a new schema (non-force creation).
+   *
+   * @param schemaConfig Schema configuration as JSON string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String createSchema(String schemaConfig)
+      throws PinotAdminException {
+    return createSchema(schemaConfig, false);
+  }
+
+  /**
+   * Validates a schema configuration without applying it.
+   *
+   * @param schemaConfig Schema configuration to validate as JSON string
+   * @param force Whether to force validation even if schema exists
+   * @return Validation response
+   * @throws PinotAdminException If the request fails
+   */
+  public String validateSchema(String schemaConfig, boolean force)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("force", String.valueOf(force));
+
+    JsonNode response = _transport.executePost(_controllerAddress, 
"/schemas/validate", schemaConfig,
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Validates a schema configuration without applying it (non-force 
validation).
+   *
+   * @param schemaConfig Schema configuration to validate as JSON string
+   * @return Validation response
+   * @throws PinotAdminException If the request fails
+   */
+  public String validateSchema(String schemaConfig)
+      throws PinotAdminException {
+    return validateSchema(schemaConfig, false);
+  }
+
+  /**
+   * Gets field specification metadata.
+   *
+   * @return Field specification metadata as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getFieldSpecMetadata()
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/schemas/fieldSpec", null, _headers);
+    return response.toString();
+  }
+
+  // Async versions of key methods
+
+  /**
+   * Lists all schema names in the cluster (async).
+   */
+  public CompletableFuture<List<String>> listSchemaNamesAsync() {
+    return _transport.executeGetAsync(_controllerAddress, "/schemas", null, 
_headers)
+        .thenApply(response -> _transport.parseStringArraySafe(response, 
"schemas"));
+  }
+
+  /**
+   * Gets a specific schema by name (async).
+   */
+  public CompletableFuture<String> getSchemaAsync(String schemaName) {
+    return _transport.executeGetAsync(_controllerAddress, "/schemas/" + 
schemaName, null, _headers)
+        .thenApply(JsonNode::toString);
+  }
+
+  /**
+   * Creates a new schema (async).
+   */
+  public CompletableFuture<String> createSchemaAsync(String schemaConfig, 
boolean force) {
+    Map<String, String> queryParams = Map.of("force", String.valueOf(force));
+
+    return _transport.executePostAsync(_controllerAddress, "/schemas", 
schemaConfig, queryParams, _headers)
+        .thenApply(JsonNode::toString);
+  }
+
+  /**
+   * Creates a new schema (async, non-force).
+   */
+  public CompletableFuture<String> createSchemaAsync(String schemaConfig) {
+    return createSchemaAsync(schemaConfig, false);
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentAdminClient.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentAdminClient.java
new file mode 100644
index 00000000000..dfde9fe4662
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentAdminClient.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * Client for segment administration operations.
+ * Provides methods to manage and query Pinot segments.
+ */
+public class PinotSegmentAdminClient {
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotSegmentAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all segments for a table.
+   *
+   * @param tableName Name of the table
+   * @param excludeReplacedSegments Whether to exclude replaced segments
+   * @return List of segment names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listSegments(String tableName, boolean 
excludeReplacedSegments)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("excludeReplacedSegments", 
String.valueOf(excludeReplacedSegments));
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName, queryParams, _headers);
+    return _transport.parseStringArray(response, "segments");
+  }
+
+  /**
+   * Lists all segments for a table (including replaced segments).
+   *
+   * @param tableName Name of the table
+   * @return List of segment names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listSegments(String tableName)
+      throws PinotAdminException {
+    return listSegments(tableName, false);
+  }
+
+  /**
+   * Gets a map from server to segments hosted by the server for a table.
+   *
+   * @param tableName Name of the table
+   * @return Server to segments map
+   * @throws PinotAdminException If the request fails
+   */
+  public String getServerToSegmentsMap(String tableName)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/segments/" + tableName + 
"/servers", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Lists segment lineage for a table in chronologically sorted order.
+   *
+   * @param tableName Name of the table
+   * @return Segment lineage as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String listSegmentLineage(String tableName)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/segments/" + tableName + 
"/lineage", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets a map from segment to CRC of the segment (only for OFFLINE tables).
+   *
+   * @param tableName Name of the table
+   * @return Segment to CRC map
+   * @throws PinotAdminException If the request fails
+   */
+  public Map<String, String> getSegmentToCrcMap(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName + "/crc", null, _headers);
+    return 
PinotAdminTransport.getObjectMapper().convertValue(response.get("segmentCrcMap"),
+        new TypeReference<Map<String, String>>() {
+        });
+  }
+
+  /**
+   * Gets the metadata for a specific segment.
+   *
+   * @param tableName Name of the table
+   * @param segmentName Name of the segment
+   * @param columns Specific columns to include (optional)
+   * @return Segment metadata
+   * @throws PinotAdminException If the request fails
+   */
+  public Map<String, Object> getSegmentMetadata(String tableName, String 
segmentName,
+      List<String> columns)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (columns != null && !columns.isEmpty()) {
+      queryParams.put("columns", String.join(",", columns));
+    }
+
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/segments/" + tableName + 
"/" + segmentName + "/metadata",
+            queryParams, _headers);
+    return PinotAdminTransport.getObjectMapper().convertValue(response,
+        new TypeReference<Map<String, Object>>() {
+        });
+  }
+
+  /**
+   * Resets a segment by disabling it, waiting for external view to stabilize, 
and enabling it again.
+   *
+   * @param tableNameWithType Table name with type suffix
+   * @param segmentName Name of the segment
+   * @param targetInstance Target instance to reset (optional)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String resetSegment(String tableNameWithType, String segmentName, 
String targetInstance)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (targetInstance != null) {
+      queryParams.put("targetInstance", targetInstance);
+    }
+
+    JsonNode response =
+        _transport.executePost(_controllerAddress, "/segments/" + 
tableNameWithType + "/" + segmentName + "/reset",
+            null, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Resets all segments or error segments only for a table.
+   *
+   * @param tableNameWithType Table name with type suffix
+   * @param errorSegmentsOnly Whether to reset only error segments
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String resetSegments(String tableNameWithType, boolean 
errorSegmentsOnly)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("errorSegmentsOnly", 
String.valueOf(errorSegmentsOnly));
+
+    JsonNode response = _transport.executePost(_controllerAddress, 
"/segments/" + tableNameWithType + "/reset",
+        null, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes a specific segment.
+   *
+   * @param tableName Name of the table
+   * @param segmentName Name of the segment
+   * @param retentionPeriod Retention period for the segment (optional)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteSegment(String tableName, String segmentName, String 
retentionPeriod)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (retentionPeriod != null) {
+      queryParams.put("retention", retentionPeriod);
+    }
+
+    JsonNode response = _transport.executeDelete(_controllerAddress, 
"/segments/" + tableName + "/" + segmentName,
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes multiple segments specified in query parameters or all segments 
if none specified.
+   *
+   * @param tableName Name of the table
+   * @param segmentNames Comma-separated list of segment names to delete 
(optional)
+   * @param retentionPeriod Retention period for the segments (optional)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteMultipleSegments(String tableName, String segmentNames,
+      String retentionPeriod)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (segmentNames != null) {
+      queryParams.put("segmentNames", segmentNames);
+    }
+    if (retentionPeriod != null) {
+      queryParams.put("retention", retentionPeriod);
+    }
+
+    JsonNode response = _transport.executeDelete(_controllerAddress, 
"/segments/" + tableName,
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes segments specified in JSON array payload.
+   *
+   * @param tableName Name of the table
+   * @param segmentDeleteRequest Segment delete request as JSON string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteSegments(String tableName, String segmentDeleteRequest)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePost(_controllerAddress, 
"/segments/" + tableName + "/delete",
+        segmentDeleteRequest, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Selects segments based on time range criteria.
+   *
+   * @param tableName Name of the table
+   * @param startTimestampMs Start timestamp in milliseconds (inclusive)
+   * @param endTimestampMs End timestamp in milliseconds (exclusive)
+   * @param excludeReplacedSegments Whether to exclude replaced segments
+   * @return Selected segments as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String selectSegments(String tableName, long startTimestampMs, long 
endTimestampMs,
+      boolean excludeReplacedSegments)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    queryParams.put("startTimestampMs", String.valueOf(startTimestampMs));
+    queryParams.put("endTimestampMs", String.valueOf(endTimestampMs));
+    queryParams.put("excludeReplacedSegments", 
String.valueOf(excludeReplacedSegments));
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName + "/select",
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets server metadata for all table segments.
+   *
+   * @param tableName Name of the table
+   * @return Server metadata as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getServerMetadata(String tableName)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/segments/" + tableName + 
"/metadata", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets a list of segments that are stale from servers hosting the table.
+   *
+   * @param tableNameWithType Table name with type suffix
+   * @return Stale segments response
+   * @throws PinotAdminException If the request fails
+   */
+  public String getStaleSegments(String tableNameWithType)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableNameWithType + "/isStale",
+        null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets the Zookeeper metadata for all table segments.
+   *
+   * @param tableName Name of the table
+   * @return Zookeeper metadata
+   * @throws PinotAdminException If the request fails
+   */
+  public Map<String, Map<String, String>> getZookeeperMetadata(String 
tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName + "/zkmetadata",
+        null, _headers);
+    return 
PinotAdminTransport.getObjectMapper().convertValue(response.get("zkMetadata"),
+        new TypeReference<Map<String, Map<String, String>>>() {
+        });
+  }
+
+  /**
+   * Gets storage tier for all segments in the given table.
+   *
+   * @param tableName Name of the table
+   * @return Storage tiers as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getStorageTiers(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/segments/" 
+ tableName + "/tiers", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets storage tiers for a specific segment.
+   *
+   * @param tableName Name of the table
+   * @param segmentName Name of the segment
+   * @param tableType Table type (OFFLINE or REALTIME)
+   * @return Storage tiers as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getSegmentStorageTiers(String tableName, String segmentName, 
String tableType)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("type", tableType);
+
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/segments/" + tableName + 
"/" + segmentName + "/tiers",
+            queryParams, _headers);
+    return response.toString();
+  }
+
+  // Async versions of key methods
+
+  /**
+   * Lists all segments for a table (async).
+   */
+  public CompletableFuture<List<String>> listSegmentsAsync(String tableName, 
boolean excludeReplacedSegments) {
+    Map<String, String> queryParams = Map.of("excludeReplacedSegments", 
String.valueOf(excludeReplacedSegments));
+
+    return _transport.executeGetAsync(_controllerAddress, "/segments/" + 
tableName, queryParams, _headers)
+        .thenApply(response -> _transport.parseStringArraySafe(response, 
"segments"));
+  }
+
+  /**
+   * Gets the metadata for a specific segment (async).
+   */
+  public CompletableFuture<Map<String, Object>> getSegmentMetadataAsync(String 
tableName, String segmentName,
+      List<String> columns) {
+    Map<String, String> queryParams = new HashMap<>();
+    if (columns != null && !columns.isEmpty()) {
+      queryParams.put("columns", String.join(",", columns));
+    }
+
+    return _transport.executeGetAsync(_controllerAddress, "/segments/" + 
tableName + "/" + segmentName + "/metadata",
+            queryParams, _headers)
+        .thenApply(response -> 
PinotAdminTransport.getObjectMapper().convertValue(response,
+            new TypeReference<Map<String, Object>>() {
+            }));
+  }
+
+  /**
+   * Deletes a specific segment (async).
+   */
+  public CompletableFuture<String> deleteSegmentAsync(String tableName, String 
segmentName,
+      String retentionPeriod) {
+    Map<String, String> queryParams = new HashMap<>();
+    if (retentionPeriod != null) {
+      queryParams.put("retention", retentionPeriod);
+    }
+
+    return _transport.executeDeleteAsync(_controllerAddress, "/segments/" + 
tableName + "/" + segmentName,
+            queryParams, _headers)
+        .thenApply(JsonNode::toString);
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotTableAdminClient.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotTableAdminClient.java
new file mode 100644
index 00000000000..5f40bad1975
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotTableAdminClient.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client for table administration operations.
+ * Provides methods to create, update, delete, and manage Pinot tables.
+ */
+public class PinotTableAdminClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTableAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotTableAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all tables in the cluster.
+   *
+   * @param tableType Filter by table type (realtime, offline, dimension)
+   * @param taskType Filter by task type
+   * @param sortType Sort by (name, creationTime, lastModifiedTime)
+   * @return List of table names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listTables(String tableType, String taskType,
+      String sortType)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (tableType != null) {
+      queryParams.put("type", tableType);
+    }
+    if (taskType != null) {
+      queryParams.put("taskType", taskType);
+    }
+    if (sortType != null) {
+      queryParams.put("sortType", sortType);
+    }
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tables", 
queryParams, _headers);
+    return _transport.parseStringArray(response, "tables");
+  }
+
+  /**
+   * Gets the configuration for a specific table.
+   *
+   * @param tableName Name of the table
+   * @return Table configuration as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTableConfig(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tables/" + 
tableName, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Creates a new table with the specified configuration.
+   *
+   * @param tableConfig Table configuration as JSON string
+   * @param validationTypesToSkip Comma-separated list of validation types to 
skip
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String createTable(String tableConfig, String validationTypesToSkip)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (validationTypesToSkip != null) {
+      queryParams.put("validationTypesToSkip", validationTypesToSkip);
+    }
+
+    JsonNode response = _transport.executePost(_controllerAddress, "/tables", 
tableConfig, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Updates the configuration of an existing table.
+   *
+   * @param tableName Name of the table
+   * @param tableConfig New table configuration as JSON string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String updateTableConfig(String tableName, String tableConfig)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePut(_controllerAddress, "/tables/" + 
tableName, tableConfig, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes a table.
+   *
+   * @param tableName Name of the table to delete
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteTable(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeDelete(_controllerAddress, 
"/tables/" + tableName, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Validates a table configuration without applying it.
+   *
+   * @param tableConfig Table configuration to validate as JSON string
+   * @return Validation response
+   * @throws PinotAdminException If the request fails
+   */
+  public String validateTableConfig(String tableConfig)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePost(_controllerAddress, 
"/tables/validate", tableConfig, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Rebalances a table (reassigns instances and segments).
+   *
+   * @param tableName Name of the table to rebalance
+   * @param noDowntime Whether to allow rebalance without downtime
+   * @param rebalanceMode Rebalance mode (default or specific)
+   * @param minReplicasToKeepAfterRebalance Minimum replicas to keep after 
rebalance
+   * @return Rebalance result
+   * @throws PinotAdminException If the request fails
+   */
+  public String rebalanceTable(String tableName, boolean noDowntime, String 
rebalanceMode,
+      Integer minReplicasToKeepAfterRebalance)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    queryParams.put("noDowntime", String.valueOf(noDowntime));
+    if (rebalanceMode != null) {
+      queryParams.put("rebalanceMode", rebalanceMode);
+    }
+    if (minReplicasToKeepAfterRebalance != null) {
+      queryParams.put("minReplicasToKeepAfterRebalance", 
String.valueOf(minReplicasToKeepAfterRebalance));
+    }
+
+    JsonNode response = _transport.executePost(_controllerAddress, "/tables/" 
+ tableName + "/rebalance", null,
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Cancels all rebalance jobs for a table.
+   *
+   * @param tableName Name of the table
+   * @return List of cancelled job IDs
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> cancelRebalance(String tableName)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeDelete(_controllerAddress, "/tables/" + tableName + 
"/rebalance", null, _headers);
+    return _transport.parseStringArray(response, "jobIds");
+  }
+
+  /**
+   * Gets the current state of a table.
+   *
+   * @param tableName Name of the table
+   * @param tableType Table type (realtime or offline)
+   * @return Table state
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTableState(String tableName, String tableType)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("type", tableType);
+
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/tables/" + tableName + 
"/state", queryParams, _headers);
+    return response.get("state").asText();
+  }
+
+  /**
+   * Enables or disables a table.
+   *
+   * @param tableName Name of the table
+   * @param tableType Table type (realtime or offline)
+   * @param enabled Whether to enable or disable the table
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String setTableState(String tableName, String tableType, boolean 
enabled)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("type", tableType);
+
+    JsonNode response = _transport.executePut(_controllerAddress, "/tables/" + 
tableName + "/state",
+        enabled ? "enable" : "disable", queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets statistics for a table.
+   *
+   * @param tableName Name of the table
+   * @return Table statistics
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTableStats(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tables/" + 
tableName + "/stats", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets the status of a table including ingestion status.
+   *
+   * @param tableName Name of the table
+   * @return Table status
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTableStatus(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tables/" + 
tableName + "/status", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets aggregate metadata for all segments of a table.
+   *
+   * @param tableName Name of the table
+   * @return Table metadata
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTableMetadata(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tables/" + 
tableName + "/metadata", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets aggregate valid document IDs metadata for all segments of a table.
+   *
+   * @param tableName Name of the table
+   * @return Valid document IDs metadata
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTableValidDocIdsMetadata(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tables/" + 
tableName + "/validDocIdsMetadata",
+        null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets aggregate index details for all segments of a table.
+   *
+   * @param tableName Name of the table
+   * @return Table indexes
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTableIndexes(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tables/" + 
tableName + "/indexes", null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Sets the time boundary for a hybrid table based on offline segments' 
metadata.
+   *
+   * @param tableName Name of the hybrid table (without type suffix)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String setTimeBoundary(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePost(_controllerAddress, "/tables/" 
+ tableName + "/timeBoundary", null,
+        null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes the time boundary for a hybrid table.
+   *
+   * @param tableName Name of the hybrid table (without type suffix)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteTimeBoundary(String tableName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeDelete(_controllerAddress, 
"/tables/" + tableName + "/timeBoundary",
+        null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets recommended configuration for a table.
+   *
+   * @param inputJson Input configuration for recommendation
+   * @return Recommended configuration
+   * @throws PinotAdminException If the request fails
+   */
+  public String recommendConfig(String inputJson)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePut(_controllerAddress, 
"/tables/recommender", inputJson, null, _headers);
+    return response.asText();
+  }
+
+  // Async versions of key methods
+
+  /**
+   * Lists all tables in the cluster (async).
+   */
+  public CompletableFuture<List<String>> listTablesAsync(String tableType, 
String taskType,
+      String sortType) {
+    Map<String, String> queryParams = new HashMap<>();
+    if (tableType != null) {
+      queryParams.put("type", tableType);
+    }
+    if (taskType != null) {
+      queryParams.put("taskType", taskType);
+    }
+    if (sortType != null) {
+      queryParams.put("sortType", sortType);
+    }
+
+    return _transport.executeGetAsync(_controllerAddress, "/tables", 
queryParams, _headers)
+        .thenApply(response -> _transport.parseStringArraySafe(response, 
"tables"));
+  }
+
+  /**
+   * Gets the configuration for a specific table (async).
+   */
+  public CompletableFuture<String> getTableConfigAsync(String tableName) {
+    return _transport.executeGetAsync(_controllerAddress, "/tables/" + 
tableName, null, _headers)
+        .thenApply(JsonNode::toString);
+  }
+
+  /**
+   * Creates a new table with the specified configuration (async).
+   */
+  public CompletableFuture<String> createTableAsync(String tableConfig, String 
validationTypesToSkip) {
+    Map<String, String> queryParams = new HashMap<>();
+    if (validationTypesToSkip != null) {
+      queryParams.put("validationTypesToSkip", validationTypesToSkip);
+    }
+
+    return _transport.executePostAsync(_controllerAddress, "/tables", 
tableConfig, queryParams, _headers)
+        .thenApply(JsonNode::toString);
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotTaskAdminClient.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotTaskAdminClient.java
new file mode 100644
index 00000000000..a8cb0b3b352
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotTaskAdminClient.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.helix.task.TaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client for task administration operations.
+ * Provides methods to monitor and manage Pinot tasks.
+ */
+public class PinotTaskAdminClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTaskAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotTaskAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all task types available in the cluster.
+   *
+   * @return Set of task type names
+   * @throws PinotAdminException If the request fails
+   */
+  public Set<String> listTaskTypes()
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/tasks/tasktypes", null, _headers);
+    return 
PinotAdminTransport.getObjectMapper().convertValue(response.get("taskTypes"), 
Set.class);
+  }
+
+  /**
+   * Gets the state (task queue state) for the given task type.
+   *
+   * @param taskType Task type name
+   * @return Task state
+   * @throws PinotAdminException If the request fails
+   */
+  public TaskState getTaskQueueState(String taskType)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tasks/" + 
taskType + "/state", null, _headers);
+    return TaskState.valueOf(response.get("state").asText());
+  }
+
+  /**
+   * Lists all tasks for the given task type.
+   *
+   * @param taskType Task type name
+   * @return Set of task names
+   * @throws PinotAdminException If the request fails
+   */
+  public Set<String> getTasks(String taskType)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tasks/" + 
taskType + "/tasks", null, _headers);
+    return 
PinotAdminTransport.getObjectMapper().convertValue(response.get("tasks"), 
Set.class);
+  }
+
+  /**
+   * Gets the count of all tasks for the given task type.
+   *
+   * @param taskType Task type name
+   * @return Task count
+   * @throws PinotAdminException If the request fails
+   */
+  public int getTasksCount(String taskType)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/tasks/" + taskType + 
"/tasks/count", null, _headers);
+    return response.get("count").asInt();
+  }
+
+  /**
+   * Lists all tasks for the given task type and table.
+   *
+   * @param taskType Task type name
+   * @param tableNameWithType Table name with type suffix
+   * @return Map of task names to task states
+   * @throws PinotAdminException If the request fails
+   */
+  public Map<String, TaskState> getTaskStatesByTable(String taskType, String 
tableNameWithType)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/tasks/" + taskType + "/" + 
tableNameWithType + "/state",
+            null, _headers);
+    return 
PinotAdminTransport.getObjectMapper().convertValue(response.get("taskStates"), 
Map.class);
+  }
+
+  /**
+   * Gets task metadata for the given task type and table.
+   *
+   * @param taskType Task type name
+   * @param tableNameWithType Table name with type suffix
+   * @return Task metadata as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTaskMetadataByTable(String taskType, String 
tableNameWithType)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/tasks/" + taskType + "/" + 
tableNameWithType + "/metadata",
+            null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes task metadata for the given task type and table.
+   *
+   * @param taskType Task type name
+   * @param tableNameWithType Table name with type suffix
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteTaskMetadataByTable(String taskType, String 
tableNameWithType)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeDelete(_controllerAddress, "/tasks/" + taskType + 
"/" + tableNameWithType + "/metadata",
+            null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Fetches count of sub-tasks for each of the tasks for the given task type.
+   *
+   * @param taskType Task type name
+   * @param state Task state(s) to filter by (optional)
+   * @param tableNameWithType Table name with type to filter by (optional)
+   * @param minNumSubtasks Minimum number of subtasks to filter by (optional)
+   * @return Task counts as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTaskCounts(String taskType, String state, String 
tableNameWithType,
+      Integer minNumSubtasks)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (state != null) {
+      queryParams.put("state", state);
+    }
+    if (tableNameWithType != null) {
+      queryParams.put("tableNameWithType", tableNameWithType);
+    }
+    if (minNumSubtasks != null) {
+      queryParams.put("minNumSubtasks", String.valueOf(minNumSubtasks));
+    }
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tasks/" + 
taskType + "/taskcounts",
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Fetches debug information for all tasks for the given task type.
+   *
+   * @param taskType Task type name
+   * @param verbosity Verbosity level (0 by default)
+   * @return Task debug information as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTasksDebugInfo(String taskType, int verbosity)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("verbosity", 
String.valueOf(verbosity));
+
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/tasks/" + taskType + 
"/debug", queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Fetches debug information for all tasks for the given task type and table.
+   *
+   * @param taskType Task type name
+   * @param tableNameWithType Table name with type suffix
+   * @param verbosity Verbosity level (0 by default)
+   * @return Task debug information as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTasksDebugInfo(String taskType, String tableNameWithType, 
int verbosity)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("verbosity", 
String.valueOf(verbosity));
+
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/tasks/" + taskType + "/" + 
tableNameWithType + "/debug",
+            queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Fetches task generation information for the recent runs of the given task 
for the given table.
+   *
+   * @param taskType Task type name
+   * @param tableNameWithType Table name with type suffix
+   * @param localOnly Whether to only lookup local cache for logs
+   * @return Task generation debug information as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTaskGenerationDebugInfo(String taskType, String 
tableNameWithType, boolean localOnly)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("localOnly", 
String.valueOf(localOnly));
+
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/tasks/generator/" + 
tableNameWithType + "/" + taskType + "/debug",
+            queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Fetches debug information for the given task name.
+   *
+   * @param taskName Task name
+   * @param verbosity Verbosity level (0 by default)
+   * @param tableNameWithType Table name with type to filter by (optional)
+   * @return Task debug information as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTaskDebugInfo(String taskName, int verbosity, String 
tableNameWithType)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    queryParams.put("verbosity", String.valueOf(verbosity));
+    if (tableNameWithType != null) {
+      queryParams.put("tableNameWithType", tableNameWithType);
+    }
+
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/tasks/task/" + taskName + "/debug",
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets a map from task to task state for the given task type.
+   *
+   * @param taskType Task type name
+   * @return Map of task names to task states
+   * @throws PinotAdminException If the request fails
+   */
+  public Map<String, TaskState> getTaskStates(String taskType)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tasks/" + 
taskType + "/taskstates", null, _headers);
+    return 
PinotAdminTransport.getObjectMapper().convertValue(response.get("taskStates"), 
Map.class);
+  }
+
+  /**
+   * Gets the task state for the given task.
+   *
+   * @param taskName Task name
+   * @return Task state
+   * @throws PinotAdminException If the request fails
+   */
+  public TaskState getTaskState(String taskName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/tasks/task/" + taskName + "/state", null, _headers);
+    return TaskState.valueOf(response.get("state").asText());
+  }
+
+  /**
+   * Gets the states of all sub-tasks for the given task.
+   *
+   * @param taskName Task name
+   * @return Map of sub-task names to sub-task states
+   * @throws PinotAdminException If the request fails
+   */
+  public Map<String, String> getSubtaskStates(String taskName)
+      throws PinotAdminException {
+    JsonNode response =
+        _transport.executeGet(_controllerAddress, "/tasks/subtask/" + taskName 
+ "/state", null, _headers);
+    return 
PinotAdminTransport.getObjectMapper().convertValue(response.get("subtaskStates"),
 Map.class);
+  }
+
+  // Async versions of key methods
+
+  /**
+   * Lists all task types available in the cluster (async).
+   */
+  public CompletableFuture<Set<String>> listTaskTypesAsync() {
+    return _transport.executeGetAsync(_controllerAddress, "/tasks/tasktypes", 
null, _headers)
+        .thenApply(response -> PinotAdminTransport.getObjectMapper()
+            .convertValue(response.get("taskTypes"), Set.class));
+  }
+
+  /**
+   * Gets the state for the given task type (async).
+   */
+  public CompletableFuture<TaskState> getTaskQueueStateAsync(String taskType) {
+    return _transport.executeGetAsync(_controllerAddress, "/tasks/" + taskType 
+ "/state", null, _headers)
+        .thenApply(response -> 
TaskState.valueOf(response.get("state").asText()));
+  }
+
+  /**
+   * Lists all tasks for the given task type (async).
+   */
+  public CompletableFuture<Set<String>> getTasksAsync(String taskType) {
+    return _transport.executeGetAsync(_controllerAddress, "/tasks/" + taskType 
+ "/tasks", null, _headers)
+        .thenApply(response -> 
PinotAdminTransport.getObjectMapper().convertValue(response.get("tasks"), 
Set.class));
+  }
+
+  /**
+   * Gets the task state for the given task (async).
+   */
+  public CompletableFuture<TaskState> getTaskStateAsync(String taskName) {
+    return _transport.executeGetAsync(_controllerAddress, "/tasks/task/" + 
taskName + "/state", null, _headers)
+        .thenApply(response -> 
TaskState.valueOf(response.get("state").asText()));
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotTenantAdminClient.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotTenantAdminClient.java
new file mode 100644
index 00000000000..5ec6746590f
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotTenantAdminClient.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client for tenant administration operations.
+ * Provides methods to create, update, delete, and manage Pinot tenants.
+ */
+public class PinotTenantAdminClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTenantAdminClient.class);
+
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotTenantAdminClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  /**
+   * Lists all tenants in the cluster.
+   *
+   * @return List of tenant names
+   * @throws PinotAdminException If the request fails
+   */
+  public List<String> listTenants()
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tenants", 
null, _headers);
+    return 
PinotAdminTransport.getObjectMapper().convertValue(response.get("tenants"), 
List.class);
+  }
+
+  /**
+   * Creates a new tenant.
+   *
+   * @param tenantConfig Tenant configuration as JSON string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String createTenant(String tenantConfig)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePost(_controllerAddress, "/tenants", 
tenantConfig, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Updates an existing tenant.
+   *
+   * @param tenantConfig Updated tenant configuration as JSON string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String updateTenant(String tenantConfig)
+      throws PinotAdminException {
+    JsonNode response = _transport.executePut(_controllerAddress, "/tenants", 
tenantConfig, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets instances for a specific tenant.
+   *
+   * @param tenantName Name of the tenant
+   * @param tenantType Tenant type (server or broker)
+   * @param tableType Table type (offline or realtime)
+   * @return Tenant instances as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTenantInstances(String tenantName, String tenantType, 
String tableType)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (tenantType != null) {
+      queryParams.put("type", tenantType);
+    }
+    if (tableType != null) {
+      queryParams.put("tableType", tableType);
+    }
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tenants/" 
+ tenantName, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets instances for a specific tenant (server type by default).
+   *
+   * @param tenantName Name of the tenant
+   * @return Tenant instances as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTenantInstances(String tenantName)
+      throws PinotAdminException {
+    return getTenantInstances(tenantName, null, null);
+  }
+
+  /**
+   * Enables or disables a tenant.
+   *
+   * @param tenantName Name of the tenant
+   * @param tenantType Tenant type (server or broker)
+   * @param state State to set (enable or disable)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String setTenantState(String tenantName, String tenantType, String 
state)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (tenantType != null) {
+      queryParams.put("type", tenantType);
+    }
+    queryParams.put("state", state);
+
+    JsonNode response =
+        _transport.executePut(_controllerAddress, "/tenants/" + tenantName, 
null, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets tables on a server or broker tenant.
+   *
+   * @param tenantName Name of the tenant
+   * @param tenantType Tenant type (server or broker)
+   * @param withTableProperties Whether to include table properties
+   * @return Tenant tables as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTenantTables(String tenantName, String tenantType, boolean 
withTableProperties)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (tenantType != null) {
+      queryParams.put("type", tenantType);
+    }
+    queryParams.put("withTableProperties", 
String.valueOf(withTableProperties));
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tenants/" 
+ tenantName + "/tables",
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets the instance partitions of a tenant.
+   *
+   * @param tenantName Name of the tenant
+   * @param instancePartitionType Instance partition type (OFFLINE, CONSUMING, 
COMPLETED)
+   * @return Instance partitions as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getInstancePartitions(String tenantName, String 
instancePartitionType)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("instancePartitionType", 
instancePartitionType);
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tenants/" 
+ tenantName + "/instancePartitions",
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Updates an instance partition for a server type in a tenant.
+   *
+   * @param tenantName Name of the tenant
+   * @param instancePartitionType Instance partition type (OFFLINE, CONSUMING, 
COMPLETED)
+   * @param instancePartitionsConfig Instance partitions configuration as JSON 
string
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String updateInstancePartitions(String tenantName, String 
instancePartitionType,
+      String instancePartitionsConfig)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("instancePartitionType", 
instancePartitionType);
+
+    JsonNode response = _transport.executePut(_controllerAddress, "/tenants/" 
+ tenantName + "/instancePartitions",
+        instancePartitionsConfig, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets tenant metadata information.
+   *
+   * @param tenantName Name of the tenant
+   * @param tenantType Tenant type (optional)
+   * @return Tenant metadata as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getTenantMetadata(String tenantName, String tenantType)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (tenantType != null) {
+      queryParams.put("type", tenantType);
+    }
+
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tenants/" 
+ tenantName + "/metadata",
+        queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Changes tenant state.
+   *
+   * @param tenantName Name of the tenant
+   * @param tenantType Tenant type (optional)
+   * @param state New state (enable, disable, drop)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String changeTenantState(String tenantName, String tenantType, String 
state)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    if (tenantType != null) {
+      queryParams.put("type", tenantType);
+    }
+    queryParams.put("state", state);
+
+    JsonNode response = _transport.executePut(_controllerAddress, "/tenants/" 
+ tenantName + "/metadata",
+        null, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Deletes a tenant.
+   *
+   * @param tenantName Name of the tenant
+   * @param tenantType Tenant type (SERVER or BROKER)
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String deleteTenant(String tenantName, String tenantType)
+      throws PinotAdminException {
+    Map<String, String> queryParams = Map.of("type", tenantType);
+
+    JsonNode response = _transport.executeDelete(_controllerAddress, 
"/tenants/" + tenantName, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Cancels a running tenant rebalance job.
+   *
+   * @param jobId Job ID of the rebalance job
+   * @return Success response
+   * @throws PinotAdminException If the request fails
+   */
+  public String cancelRebalance(String jobId)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeDelete(_controllerAddress, 
"/tenants/rebalance/" + jobId, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Rebalances all tables that are part of the tenant.
+   *
+   * @param tenantName Name of the tenant
+   * @param degreeOfParallelism Number of table rebalance jobs allowed to run 
at the same time
+   * @param includeTableTypes Comma-separated list of table types to include 
(optional)
+   * @param excludeTableTypes Comma-separated list of table types to exclude 
(optional)
+   * @param rebalanceMode Rebalance mode (optional)
+   * @return Rebalance result
+   * @throws PinotAdminException If the request fails
+   */
+  public String rebalanceTenant(String tenantName, int degreeOfParallelism, 
String includeTableTypes,
+      String excludeTableTypes, String rebalanceMode)
+      throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    queryParams.put("degreeOfParallelism", 
String.valueOf(degreeOfParallelism));
+    if (includeTableTypes != null) {
+      queryParams.put("includeTableTypes", includeTableTypes);
+    }
+    if (excludeTableTypes != null) {
+      queryParams.put("excludeTableTypes", excludeTableTypes);
+    }
+    if (rebalanceMode != null) {
+      queryParams.put("rebalanceMode", rebalanceMode);
+    }
+
+    JsonNode response = _transport.executePost(_controllerAddress, "/tenants/" 
+ tenantName + "/rebalance",
+        null, queryParams, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets detailed stats of a tenant rebalance operation.
+   *
+   * @param jobId Tenant rebalance job ID
+   * @return Rebalance status as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getRebalanceStatus(String jobId)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, 
"/tenants/rebalanceStatus/" + jobId, null, _headers);
+    return response.toString();
+  }
+
+  /**
+   * Gets list of rebalance jobs for a tenant.
+   *
+   * @param tenantName Name of the tenant
+   * @return Rebalance jobs as JSON string
+   * @throws PinotAdminException If the request fails
+   */
+  public String getRebalanceJobs(String tenantName)
+      throws PinotAdminException {
+    JsonNode response = _transport.executeGet(_controllerAddress, "/tenants/" 
+ tenantName + "/rebalanceJobs",
+        null, _headers);
+    return response.toString();
+  }
+
+  // Async versions of key methods
+
+  /**
+   * Lists all tenants in the cluster (async).
+   */
+  public CompletableFuture<List<String>> listTenantsAsync() {
+    return _transport.executeGetAsync(_controllerAddress, "/tenants", null, 
_headers)
+        .thenApply(response -> 
PinotAdminTransport.getObjectMapper().convertValue(response.get("tenants"), 
List.class));
+  }
+
+  /**
+   * Creates a new tenant (async).
+   */
+  public CompletableFuture<String> createTenantAsync(String tenantConfig) {
+    return _transport.executePostAsync(_controllerAddress, "/tenants", 
tenantConfig, null, _headers)
+        .thenApply(JsonNode::toString);
+  }
+
+  /**
+   * Gets tenant metadata information (async).
+   */
+  public CompletableFuture<String> getTenantMetadataAsync(String tenantName, 
String tenantType) {
+    Map<String, String> queryParams = new HashMap<>();
+    if (tenantType != null) {
+      queryParams.put("type", tenantType);
+    }
+
+    return _transport.executeGetAsync(_controllerAddress, "/tenants/" + 
tenantName + "/metadata", queryParams, _headers)
+        .thenApply(JsonNode::toString);
+  }
+
+  /**
+   * Deletes a tenant (async).
+   */
+  public CompletableFuture<String> deleteTenantAsync(String tenantName, String 
tenantType) {
+    Map<String, String> queryParams = Map.of("type", tenantType);
+
+    return _transport.executeDeleteAsync(_controllerAddress, "/tenants/" + 
tenantName, queryParams, _headers)
+        .thenApply(JsonNode::toString);
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
new file mode 100644
index 00000000000..b59ccd67ab7
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
@@ -0,0 +1,336 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+# Pinot Admin Client
+
+The Pinot Admin Client provides a Java API for all administrative operations 
available through the Pinot controller REST
+APIs. This client allows you to programmatically manage Pinot clusters, 
including tables, schemas, instances, segments,
+tenants, and tasks.
+
+## Features
+
+- **Complete API Coverage**: Codifies all Pinot controller REST APIs
+- **Authentication Support**: Supports Basic, Bearer, and custom authentication
+- **Async Operations**: Provides both synchronous and asynchronous methods
+- **Comprehensive Exception Handling**: Specific exception types for different 
error scenarios
+- **Type Safety**: Strongly typed API with proper error handling
+
+## Architecture
+
+The admin client consists of:
+
+1. **PinotAdminClient**: Main entry point that provides access to all service 
clients
+2. **PinotAdminTransport**: HTTP transport layer for communicating with the 
controller
+3. **Service Clients**:
+    - `PinotTableAdminClient`: Table operations (CRUD, status, metadata)
+    - `PinotSchemaAdminClient`: Schema operations (CRUD, validation)
+    - `PinotInstanceAdminClient`: Instance operations (CRUD, state management)
+    - `PinotSegmentAdminClient`: Segment operations (query, management)
+    - `PinotTenantAdminClient`: Tenant operations (CRUD, configuration)
+    - `PinotTaskAdminClient`: Task management operations
+4. **Exception Classes**: Specialized exceptions for different error types
+5. **Authentication Utilities**: Support for various authentication methods
+
+## Usage Examples
+
+### Basic Usage
+
+```java
+import org.apache.pinot.client.admin.*;
+
+// Create client without authentication
+try(PinotAdminClient adminClient = new PinotAdminClient("localhost:9000")){
+// List all tables
+List<String> tables = adminClient.getTableClient().listTables(null, null, 
null);
+
+// Get a specific table configuration
+String config = adminClient.getTableClient().getTableConfig("myTable");
+
+// List schemas
+List<String> schemas = adminClient.getSchemaClient().listSchemaNames();
+}
+```
+
+### With Authentication
+
+```java
+import org.apache.pinot.client.admin.*;
+import java.util.Map;
+import java.util.Properties;
+
+// Create client with basic authentication
+Properties properties = new Properties();
+properties.setProperty("pinot.admin.request.timeout.ms", "30000");
+
+PinotAdminClient adminClient = new PinotAdminClient(
+    "localhost:9000",
+    properties,
+    PinotAdminAuthentication.AuthType.BASIC,
+    Map.of("username", "admin", "password", "password")
+);
+
+try {
+  // Create a new schema
+  String schemaConfig =
+      
"{\"schemaName\":\"mySchema\",\"dimensionFieldSpecs\":[{\"name\":\"id\",\"dataType\":\"INT\"}]}";
+  String result = adminClient.getSchemaClient().createSchema(schemaConfig);
+} finally {
+  adminClient.close();
+}
+```
+
+### Async Operations
+
+```java
+import org.apache.pinot.client.admin.*;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+// Assume adminClient is already created
+CompletableFuture<List<String>> tablesFuture =
+    adminClient.getTableClient().listTablesAsync(null, null, null);
+CompletableFuture<List<String>> schemasFuture =
+    adminClient.getSchemaClient().listSchemaNamesAsync();
+
+try {
+  List<String> tables = tablesFuture.get();
+  List<String> schemas = schemasFuture.get();
+} catch (Exception e) {
+  // Handle exceptions
+}
+```
+
+## Service Clients
+
+### Table Operations
+
+```java
+PinotTableAdminClient tableClient = adminClient.getTableClient();
+
+// List tables with filters
+List<String> tables = tableClient.listTables("offline", null, "name");
+
+// Create a table
+String result = tableClient.createTable(tableConfigJson, "ALL");
+
+// Update table configuration
+String updateResult = tableClient.updateTableConfig("myTable", newConfigJson);
+
+// Get table status
+String status = tableClient.getTableStatus("myTable");
+
+// Rebalance a table
+String rebalanceResult = tableClient.rebalanceTable("myTable", true, 
"default", 1);
+```
+
+### Schema Operations
+
+```java
+PinotSchemaAdminClient schemaClient = adminClient.getSchemaClient();
+
+// List all schemas
+List<String> schemas = schemaClient.listSchemaNames();
+
+// Get schema configuration
+String schema = schemaClient.getSchema("mySchema");
+
+// Create a new schema
+String createResult = schemaClient.createSchema(schemaConfigJson);
+
+// Validate schema before creating
+String validation = schemaClient.validateSchema(schemaConfigJson);
+```
+
+### Instance Operations
+
+```java
+PinotInstanceAdminClient instanceClient = adminClient.getInstanceClient();
+
+// List all instances
+List<String> instances = instanceClient.listInstances();
+
+// Get instance information
+String instanceInfo = instanceClient.getInstance("Server_192.168.1.1_8098");
+
+// Enable/disable instance
+String result = instanceClient.setInstanceState("Server_192.168.1.1_8098", 
true);
+
+// Update instance configuration
+String updateResult = instanceClient.updateInstance("Server_192.168.1.1_8098", 
configJson);
+```
+
+### Segment Operations
+
+```java
+PinotSegmentAdminClient segmentClient = adminClient.getSegmentClient();
+
+// List segments for a table
+List<String> segments = segmentClient.listSegments("myTable_OFFLINE", false);
+
+// Get segment metadata
+Map<String, Object> metadata = 
segmentClient.getSegmentMetadata("myTable_OFFLINE", "segmentName", null);
+
+// Delete a segment
+String deleteResult = segmentClient.deleteSegment("myTable_OFFLINE", 
"segmentName", "7d");
+```
+
+### Tenant Operations
+
+```java
+PinotTenantAdminClient tenantClient = adminClient.getTenantClient();
+
+// List all tenants
+List<String> tenants = tenantClient.listTenants();
+
+// Create a tenant
+String createResult = tenantClient.createTenant(tenantConfigJson);
+
+// Get tenant metadata
+String metadata = tenantClient.getTenantMetadata("myTenant", "SERVER");
+
+// Rebalance tenant tables
+String rebalanceResult = tenantClient.rebalanceTenant("myTenant", 2, null, 
null, "default");
+```
+
+### Task Operations
+
+```java
+PinotTaskAdminClient taskClient = adminClient.getTaskClient();
+
+// List task types
+Set<String> taskTypes = taskClient.listTaskTypes();
+
+// Get tasks for a specific type
+Set<String> tasks = taskClient.getTasks("SegmentReloadTask");
+
+// Get task state
+TaskState state = taskClient.getTaskState("taskName");
+
+// Get task debug information
+String debugInfo = taskClient.getTaskDebugInfo("taskName", 0, null);
+```
+
+## Authentication
+
+The client supports multiple authentication methods:
+
+### Basic Authentication
+
+```java
+PinotAdminClient client = new PinotAdminClient("localhost:9000", properties,
+    PinotAdminAuthentication.AuthType.BASIC,
+    Map.of("username", "admin", "password", "password"));
+```
+
+### Bearer Token Authentication
+
+```java
+PinotAdminClient client = new PinotAdminClient("localhost:9000", properties,
+    PinotAdminAuthentication.AuthType.BEARER,
+    Map.of("token", "your-jwt-token"));
+```
+
+### Custom Authentication
+
+```java
+Map<String, String> customHeaders = Map.of(
+    "X-API-Key", "your-api-key",
+    "X-Tenant-Id", "tenant-123"
+);
+
+PinotAdminClient client = new PinotAdminClient("localhost:9000", properties,
+    PinotAdminAuthentication.AuthType.CUSTOM, customHeaders);
+```
+
+## Exception Handling
+
+The client provides specific exception types:
+
+- `PinotAdminException`: General admin operation errors
+- `PinotAdminAuthenticationException`: Authentication failures
+- `PinotAdminNotFoundException`: Resource not found errors
+- `PinotAdminValidationException`: Validation failures
+
+```java
+try {
+  adminClient.getTableClient().getTableConfig("nonexistent");
+} catch (PinotAdminNotFoundException e) {
+  System.out.println("Table not found: " + e.getMessage());
+} catch (PinotAdminAuthenticationException e) {
+  System.out.println("Authentication failed: " + e.getMessage());
+} catch (PinotAdminException e) {
+  System.out.println("Admin operation failed: " + e.getMessage());
+}
+```
+
+## Configuration
+
+The client can be configured through properties:
+
+```java
+Properties properties = new Properties();
+properties.setProperty("pinot.admin.request.timeout.ms", "60000");
+properties.setProperty("pinot.admin.scheme", "https");
+
+// Create client with configuration
+PinotAdminClient client = new PinotAdminClient("localhost:9000", properties);
+```
+
+## Building
+
+The admin client is part of the `pinot-java-client` module. Build it along 
with the rest of Pinot:
+
+```bash
+mvn clean compile -DskipTests
+```
+
+## Testing
+
+Run the integration tests:
+
+```bash
+mvn test -Dtest=PinotAdminClientTest
+```
+
+Note: Integration tests require a running Pinot cluster.
+
+## Contributing
+
+When adding new API methods:
+
+1. Add the method to the appropriate service client
+2. Include proper exception handling
+3. Add async version if the operation is long-running
+4. Update documentation and examples
+5. Add corresponding tests
+
+## API Coverage
+
+The admin client currently covers all major Pinot controller APIs:
+
+- ✅ Table management (CRUD, status, metadata, rebalance)
+- ✅ Schema management (CRUD, validation)
+- ✅ Instance management (CRUD, state management)
+- ✅ Segment operations (query, metadata, deletion)
+- ✅ Tenant management (CRUD, configuration, rebalance)
+- ✅ Task management (monitoring, debugging)
+
+This provides a complete programmatic interface for Pinot cluster 
administration.
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
new file mode 100644
index 00000000000..78697101b7e
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Unit tests for PinotAdminClient using mocked transport (no real server).
+ */
+public class PinotAdminClientTest {
+  @Mock
+  private PinotAdminTransport _mockTransport;
+
+  private PinotAdminClient _adminClient;
+  private static final String CONTROLLER_ADDRESS = "localhost:9000";
+  private static final Map<String, String> HEADERS = Map.of("Authorization", 
"Bearer token");
+
+  @BeforeMethod
+  public void setUp()
+      throws Exception {
+    MockitoAnnotations.openMocks(this);
+    _adminClient = new PinotAdminClient(CONTROLLER_ADDRESS, _mockTransport, 
HEADERS);
+
+    // For helper methods on the transport, call real implementations so 
parsing works
+    lenient().when(_mockTransport.parseStringArray(any(), 
anyString())).thenCallRealMethod();
+    lenient().when(_mockTransport.parseStringArraySafe(any(), 
anyString())).thenCallRealMethod();
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    if (_adminClient != null) {
+      _adminClient.close();
+    }
+  }
+
+  @Test
+  public void testListTables()
+      throws Exception {
+    String jsonResponse = "{\"tables\": [\"tbl1\", \"tbl2\"]}";
+    JsonNode mockResponse = new ObjectMapper().readTree(jsonResponse);
+
+    lenient().when(_mockTransport.executeGet(anyString(), anyString(), any(), 
any()))
+        .thenReturn(mockResponse);
+
+    List<String> tables = _adminClient.getTableClient().listTables(null, null, 
null);
+
+    assertNotNull(tables);
+    assertEquals(tables.size(), 2);
+    assertEquals(tables.get(0), "tbl1");
+  }
+
+  @Test
+  public void testGetTableConfig()
+      throws Exception {
+    String jsonResponse = "{\"tableName\":\"tbl1_OFFLINE\"}";
+    JsonNode mockResponse = new ObjectMapper().readTree(jsonResponse);
+    lenient().when(_mockTransport.executeGet(anyString(), anyString(), any(), 
any()))
+        .thenReturn(mockResponse);
+
+    String cfg = _adminClient.getTableClient().getTableConfig("tbl1_OFFLINE");
+    assertNotNull(cfg);
+    assertEquals(new ObjectMapper().readTree(cfg).get("tableName").asText(), 
"tbl1_OFFLINE");
+  }
+
+  @Test
+  public void testListSchemas()
+      throws Exception {
+    String jsonResponse = "{\"schemas\": [\"sch1\", \"sch2\"]}";
+    JsonNode mockResponse = new ObjectMapper().readTree(jsonResponse);
+    lenient().when(_mockTransport.executeGet(anyString(), anyString(), any(), 
any()))
+        .thenReturn(mockResponse);
+
+    List<String> schemas = _adminClient.getSchemaClient().listSchemaNames();
+    assertNotNull(schemas);
+    assertEquals(schemas.size(), 2);
+    assertEquals(schemas.get(1), "sch2");
+  }
+
+  @Test
+  public void testAsyncListSchemas()
+      throws Exception {
+    String jsonResponse = "{\"schemas\": [\"sch1\"]}";
+    JsonNode mockResponse = new ObjectMapper().readTree(jsonResponse);
+    CompletableFuture<JsonNode> jsonNodeCompletableFuture = 
CompletableFuture.completedFuture(mockResponse);
+    lenient().when(_mockTransport.executeGetAsync(anyString(), anyString(), 
any(), any()))
+        .thenReturn(jsonNodeCompletableFuture);
+
+    List<String> schemas = 
_adminClient.getSchemaClient().listSchemaNamesAsync().get();
+    assertNotNull(schemas);
+    assertEquals(schemas.size(), 1);
+    assertEquals(schemas.get(0), "sch1");
+  }
+
+  @Test
+  public void testCreateTable()
+      throws Exception {
+    JsonNode mockResponse = new ObjectMapper().readTree("{\"status\":\"OK\"}");
+    lenient().when(_mockTransport.executePost(anyString(), anyString(), any(), 
any(), any()))
+        .thenReturn(mockResponse);
+
+    String resp = _adminClient.getTableClient().createTable("{}", null);
+    assertEquals(new ObjectMapper().readTree(resp).get("status").asText(), 
"OK");
+  }
+
+  @Test
+  public void testDeleteTable()
+      throws Exception {
+    JsonNode mockResponse = new 
ObjectMapper().readTree("{\"status\":\"DELETED\"}");
+    lenient().when(_mockTransport.executeDelete(anyString(), anyString(), 
any(), any()))
+        .thenReturn(mockResponse);
+
+    String resp = _adminClient.getTableClient().deleteTable("tbl1_OFFLINE");
+    assertEquals(new ObjectMapper().readTree(resp).get("status").asText(), 
"DELETED");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to