This is an automated email from the ASF dual-hosted git repository.

yuqi1129 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c5dc910e1 [#10828] feat(catalog-glue): Implement schema and table 
CRUD operations (#10829)
3c5dc910e1 is described below

commit 3c5dc910e17e1394b21f62f2b2903c511a83da88
Author: Yuhui <[email protected]>
AuthorDate: Wed Apr 29 21:57:48 2026 +0800

    [#10828] feat(catalog-glue): Implement schema and table CRUD operations 
(#10829)
    
    ### What changes were proposed in this pull request?
    
    Implement schema CRUD, table CRUD, partition support, and exception
    mapping for the catalog-glue module:
    
    - `GlueExceptionConverter` — maps
    `EntityNotFoundException`/`AlreadyExistsException`/`InvalidInputException`
    to Gravitino exceptions
    - `GlueCatalogOperations` — implements `SupportsSchemas` and
    `TableCatalog` backed by AWS Glue API
    - `GlueTableOperations` — implements `SupportsPartitions` backed by Glue
    partition APIs
    - `GlueTable` — `newOps()` wired to return `GlueTableOperations`
    
    ### Why are the changes needed?
    
    These classes implement the core metadata operations required to make
    the catalog-glue module functional.
    
    Fix: #10828
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is internal implementation of the catalog-glue module.
    
    ### How was this patch tested?
    
    Unit tests covering schema/table/partition CRUD with mocked
    `GlueClient`.
---
 .../catalog/glue/GlueCatalogOperations.java        | 668 ++++++++++++++++++++-
 .../catalog/glue/GlueExceptionConverter.java       |  74 +++
 .../apache/gravitino/catalog/glue/GlueTable.java   |  40 +-
 .../catalog/glue/GlueTableOperations.java          | 270 +++++++++
 .../catalog/glue/TestGlueCatalogSchema.java        | 648 ++++++++++++++++++++
 .../catalog/glue/TestGlueTableOperations.java      | 235 ++++++++
 6 files changed, 1923 insertions(+), 12 deletions(-)

diff --git 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java
 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java
index 22a2175f7b..70f1d5278d 100644
--- 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java
+++ 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java
@@ -18,32 +18,123 @@
  */
 package org.apache.gravitino.catalog.glue;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.ConnectionFailedException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortDirection;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.Order;
+import software.amazon.awssdk.services.glue.model.SerDeInfo;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
 
 /**
  * Operations implementation for the AWS Glue Data Catalog connector.
  *
- * <p>This is a scaffold stub. Full implementation will be added in PR-02 
through PR-06.
- *
- * <p>TODO PR-04: implement SupportsSchemas (Schema CRUD + exception mapping)
- *
- * <p>TODO PR-05: implement TableCatalog (Table CRUD + type detection)
+ * <p>Implements schema CRUD (via {@link SupportsSchemas}) and table CRUD (via 
{@link TableCatalog})
+ * backed by the AWS Glue API.
  */
-public class GlueCatalogOperations implements CatalogOperations {
+public class GlueCatalogOperations implements CatalogOperations, 
SupportsSchemas, TableCatalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalogOperations.class);
+
+  /** Table property keys that map to StorageDescriptor fields, not to 
Table.parameters(). */
+  private static final Set<String> SD_TABLE_PROPERTY_KEYS =
+      ImmutableSet.of(
+          GlueConstants.LOCATION,
+          GlueConstants.INPUT_FORMAT,
+          GlueConstants.OUTPUT_FORMAT,
+          GlueConstants.SERDE_LIB,
+          GlueConstants.SERDE_NAME);
+
+  /** Property keys that map to top-level TableInput fields, not to 
Table.parameters(). */
+  private static final Set<String> TABLE_LEVEL_KEYS = 
ImmutableSet.of(GlueConstants.TABLE_TYPE);
+
+  @VisibleForTesting GlueClient glueClient;
+
+  /** Nullable — when null, Glue uses the caller's AWS account ID. */
+  @VisibleForTesting String catalogId;
 
-  // TODO PR-04: add GlueClient field and catalogId
+  /** Nullable — when null all table formats are exposed. */
+  @VisibleForTesting Set<String> tableFormatFilter;
+
+  private final GlueTypeConverter typeConverter = new GlueTypeConverter();
 
   @Override
   public void initialize(
       Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
       throws RuntimeException {
-    // TODO PR-04: build GlueClient via GlueClientProvider and store catalogId
+    this.glueClient = GlueClientProvider.buildClient(config);
+    this.catalogId = config.get(GlueConstants.AWS_GLUE_CATALOG_ID);
+    String filterProp =
+        config.getOrDefault(
+            GlueConstants.TABLE_FORMAT_FILTER, 
GlueConstants.DEFAULT_TABLE_FORMAT_FILTER);
+    if 
(!GlueConstants.DEFAULT_TABLE_FORMAT_FILTER.equalsIgnoreCase(filterProp)) {
+      tableFormatFilter =
+          Arrays.stream(filterProp.split(","))
+              .map(String::trim)
+              .map(s -> s.toLowerCase(Locale.ROOT))
+              .collect(Collectors.toSet());
+    }
   }
 
   @Override
@@ -54,11 +145,568 @@ public class GlueCatalogOperations implements 
CatalogOperations {
       String comment,
       Map<String, String> properties)
       throws Exception {
-    // TODO PR-04: call GlueClient.getDatabases() to verify connectivity
+    try {
+      GetDatabasesRequest.Builder req = 
GetDatabasesRequest.builder().maxResults(1);
+      applyCatalogId(catalogId, req::catalogId);
+      glueClient.getDatabases(req.build());
+    } catch (GlueException e) {
+      throw new ConnectionFailedException(e, "Failed to connect to AWS Glue: 
%s", e.getMessage());
+    }
   }
 
   @Override
   public void close() throws IOException {
-    // TODO PR-04: close GlueClient
+    if (glueClient != null) {
+      glueClient.close();
+      glueClient = null;
+    }
+  }
+
+  @Override
+  public NameIdentifier[] listSchemas(Namespace namespace) throws 
NoSuchCatalogException {
+    List<NameIdentifier> result = new ArrayList<>();
+    String nextToken = null;
+    try {
+      do {
+        GetDatabasesRequest.Builder req = GetDatabasesRequest.builder();
+        applyCatalogId(catalogId, req::catalogId);
+        if (nextToken != null) req.nextToken(nextToken);
+        GetDatabasesResponse resp = glueClient.getDatabases(req.build());
+        resp.databaseList().stream()
+            .map(db -> NameIdentifier.of(namespace, db.name()))
+            .forEach(result::add);
+        nextToken = resp.nextToken();
+      } while (nextToken != null);
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "listing schemas under 
" + namespace);
+    }
+    return result.toArray(new NameIdentifier[0]);
+  }
+
+  @Override
+  public GlueSchema createSchema(
+      NameIdentifier ident, String comment, Map<String, String> properties)
+      throws NoSuchCatalogException, SchemaAlreadyExistsException {
+
+    Map<String, String> params = properties != null ? properties : 
Collections.emptyMap();
+
+    DatabaseInput input =
+        
DatabaseInput.builder().name(ident.name()).description(comment).parameters(params).build();
+
+    CreateDatabaseRequest.Builder req = 
CreateDatabaseRequest.builder().databaseInput(input);
+    applyCatalogId(catalogId, req::catalogId);
+
+    try {
+      glueClient.createDatabase(req.build());
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "schema " + 
ident.name());
+    }
+
+    LOG.info("Created Glue schema (database) {}", ident.name());
+
+    return GlueSchema.builder()
+        .withName(ident.name())
+        .withComment(comment)
+        .withProperties(params)
+        .withAuditInfo(
+            AuditInfo.builder()
+                .withCreator(PrincipalUtils.getCurrentUserName())
+                .withCreateTime(Instant.now())
+                .build())
+        .build();
+  }
+
+  @Override
+  public GlueSchema loadSchema(NameIdentifier ident) throws 
NoSuchSchemaException {
+    GetDatabaseRequest.Builder req = 
GetDatabaseRequest.builder().name(ident.name());
+    applyCatalogId(catalogId, req::catalogId);
+    try {
+      GlueSchema schema =
+          
GlueSchema.fromGlueDatabase(glueClient.getDatabase(req.build()).database());
+      LOG.info("Loaded Glue schema (database) {}", ident.name());
+      return schema;
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "schema " + 
ident.name());
+    }
+  }
+
+  @Override
+  public GlueSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
+      throws NoSuchSchemaException {
+
+    GlueSchema current = loadSchema(ident);
+
+    Map<String, String> newProps = new HashMap<>(current.properties());
+
+    for (SchemaChange change : changes) {
+      if (change instanceof SchemaChange.SetProperty) {
+        SchemaChange.SetProperty sp = (SchemaChange.SetProperty) change;
+        newProps.put(sp.getProperty(), sp.getValue());
+      } else if (change instanceof SchemaChange.RemoveProperty) {
+        newProps.remove(((SchemaChange.RemoveProperty) change).getProperty());
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported schema change: " + change.getClass().getSimpleName());
+      }
+    }
+
+    DatabaseInput input =
+        DatabaseInput.builder()
+            .name(ident.name())
+            .description(current.comment())
+            .parameters(newProps)
+            .build();
+
+    UpdateDatabaseRequest.Builder req =
+        
UpdateDatabaseRequest.builder().name(ident.name()).databaseInput(input);
+    applyCatalogId(catalogId, req::catalogId);
+
+    try {
+      glueClient.updateDatabase(req.build());
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "schema " + 
ident.name());
+    }
+
+    LOG.info("Altered Glue schema (database) {}", ident.name());
+
+    return GlueSchema.builder()
+        .withName(ident.name())
+        .withComment(current.comment())
+        .withProperties(newProps)
+        .withAuditInfo(current.auditInfo())
+        .build();
+  }
+
+  @Override
+  public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
+    if (!cascade) {
+      GetTablesRequest.Builder tabReq =
+          GetTablesRequest.builder().databaseName(ident.name()).maxResults(1);
+      applyCatalogId(catalogId, tabReq::catalogId);
+      try {
+        if (!glueClient.getTables(tabReq.build()).tableList().isEmpty()) {
+          throw new NonEmptySchemaException(
+              "Schema %s is not empty. Use cascade=true to drop it with its 
tables.", ident.name());
+        }
+      } catch (GlueException e) {
+        throw GlueExceptionConverter.toSchemaException(
+            e, "checking tables in schema " + ident.name());
+      }
+    }
+
+    DeleteDatabaseRequest.Builder req = 
DeleteDatabaseRequest.builder().name(ident.name());
+    applyCatalogId(catalogId, req::catalogId);
+    try {
+      glueClient.deleteDatabase(req.build());
+      LOG.info("Dropped Glue schema (database) {}", ident.name());
+      return true;
+    } catch (EntityNotFoundException e) {
+      return false;
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "schema " + 
ident.name());
+    }
+  }
+
+  @Override
+  public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
+    String dbName = schemaName(namespace);
+    List<NameIdentifier> result = new ArrayList<>();
+    String nextToken = null;
+    try {
+      do {
+        GetTablesRequest.Builder req = 
GetTablesRequest.builder().databaseName(dbName);
+        applyCatalogId(catalogId, req::catalogId);
+        if (nextToken != null) req.nextToken(nextToken);
+        GetTablesResponse resp = glueClient.getTables(req.build());
+        resp.tableList().stream()
+            .filter(this::matchesFormatFilter)
+            .map(t -> NameIdentifier.of(namespace, t.name()))
+            .forEach(result::add);
+        nextToken = resp.nextToken();
+      } while (nextToken != null);
+    } catch (EntityNotFoundException e) {
+      throw new NoSuchSchemaException(e, "Schema %s does not exist", dbName);
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "listing tables in 
schema " + dbName);
+    }
+    return result.toArray(new NameIdentifier[0]);
+  }
+
+  @Override
+  public GlueTable loadTable(NameIdentifier ident) throws NoSuchTableException 
{
+    String dbName = schemaName(ident.namespace());
+    GetTableRequest.Builder req = 
GetTableRequest.builder().databaseName(dbName).name(ident.name());
+    applyCatalogId(catalogId, req::catalogId);
+    try {
+      GlueTable table =
+          GlueTable.fromGlueTable(glueClient.getTable(req.build()).table(), 
typeConverter);
+      table.initOpsContext(glueClient, catalogId, dbName);
+      LOG.info("Loaded Glue table {}.{}", dbName, ident.name());
+      return table;
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toTableException(e, "table " + 
ident.name());
+    }
+  }
+
+  @Override
+  public GlueTable createTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes)
+      throws NoSuchSchemaException, TableAlreadyExistsException {
+
+    Preconditions.checkArgument(indexes.length == 0, "Glue catalog does not 
support indexes");
+
+    for (Transform t : partitions) {
+      Preconditions.checkArgument(
+          t instanceof Transforms.IdentityTransform,
+          "Glue catalog only supports identity partitioning, got: %s",
+          t.name());
+      Preconditions.checkArgument(
+          ((Transforms.IdentityTransform) t).fieldName().length == 1,
+          "Glue catalog does not support nested field partitioning");
+    }
+
+    String dbName = schemaName(ident.namespace());
+    Map<String, String> props = properties != null ? properties : 
Collections.emptyMap();
+
+    TableInput input =
+        buildTableInput(
+            ident.name(), comment, columns, props, partitions, distribution, 
sortOrders);
+
+    CreateTableRequest.Builder req =
+        CreateTableRequest.builder().databaseName(dbName).tableInput(input);
+    applyCatalogId(catalogId, req::catalogId);
+
+    try {
+      glueClient.createTable(req.build());
+    } catch (EntityNotFoundException e) {
+      throw new NoSuchSchemaException(e, "Schema %s does not exist", dbName);
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toTableException(e, "table " + 
ident.name());
+    }
+
+    LOG.info("Created Glue table {}.{}", dbName, ident.name());
+
+    GlueTable created =
+        GlueTable.builder()
+            .withName(ident.name())
+            .withComment(comment)
+            .withColumns(columns)
+            .withProperties(props)
+            .withPartitioning(partitions)
+            .withDistribution(distribution != null ? distribution : 
Distributions.NONE)
+            .withSortOrders(sortOrders != null ? sortOrders : new SortOrder[0])
+            .withAuditInfo(
+                AuditInfo.builder()
+                    .withCreator(PrincipalUtils.getCurrentUserName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+    created.initOpsContext(glueClient, catalogId, dbName);
+    return created;
+  }
+
+  @Override
+  public GlueTable alterTable(NameIdentifier ident, TableChange... changes)
+      throws NoSuchTableException, IllegalArgumentException {
+
+    GlueTable current = loadTable(ident);
+    String dbName = schemaName(ident.namespace());
+
+    String newName = current.name();
+    String newComment = current.comment();
+    Map<String, String> newProps = new HashMap<>(current.properties());
+
+    // Separate data columns from partition columns
+    int partCount = current.partitioning().length;
+    List<Column> allCols = new ArrayList<>(Arrays.asList(current.columns()));
+    List<Column> dataCols = new ArrayList<>(allCols.subList(0, allCols.size() 
- partCount));
+    List<Column> partCols =
+        new ArrayList<>(allCols.subList(allCols.size() - partCount, 
allCols.size()));
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.RenameTable) {
+        TableChange.RenameTable renameTable = (TableChange.RenameTable) change;
+        renameTable
+            .getNewSchemaName()
+            .ifPresent(
+                s -> {
+                  throw new UnsupportedOperationException(
+                      "Glue does not support cross-schema table rename");
+                });
+        newName = renameTable.getNewName();
+      } else if (change instanceof TableChange.UpdateComment) {
+        newComment = ((TableChange.UpdateComment) change).getNewComment();
+      } else if (change instanceof TableChange.SetProperty) {
+        TableChange.SetProperty sp = (TableChange.SetProperty) change;
+        newProps.put(sp.getProperty(), sp.getValue());
+      } else if (change instanceof TableChange.RemoveProperty) {
+        newProps.remove(((TableChange.RemoveProperty) change).getProperty());
+      } else if (change instanceof TableChange.ColumnChange) {
+        applyColumnChange(dataCols, partCols, (TableChange.ColumnChange) 
change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change: " + change.getClass().getSimpleName());
+      }
+    }
+
+    List<Column> newAllCols = new ArrayList<>(dataCols);
+    newAllCols.addAll(partCols);
+    Column[] newColumns = newAllCols.toArray(new Column[0]);
+
+    TableInput input =
+        buildTableInput(
+            newName,
+            newComment,
+            newColumns,
+            newProps,
+            current.partitioning(),
+            current.distribution(),
+            current.sortOrder());
+
+    UpdateTableRequest.Builder req =
+        UpdateTableRequest.builder().databaseName(dbName).tableInput(input);
+    applyCatalogId(catalogId, req::catalogId);
+
+    try {
+      glueClient.updateTable(req.build());
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toTableException(e, "table " + 
ident.name());
+    }
+
+    LOG.info("Altered Glue table {}.{}", dbName, ident.name());
+
+    GlueTable altered =
+        GlueTable.builder()
+            .withName(newName)
+            .withComment(newComment)
+            .withColumns(newColumns)
+            .withProperties(newProps)
+            .withPartitioning(current.partitioning())
+            .withDistribution(current.distribution())
+            .withSortOrders(current.sortOrder())
+            .withAuditInfo(current.auditInfo())
+            .build();
+    altered.initOpsContext(glueClient, catalogId, dbName);
+    return altered;
+  }
+
+  @Override
+  public boolean dropTable(NameIdentifier ident) {
+    String dbName = schemaName(ident.namespace());
+    DeleteTableRequest.Builder req =
+        DeleteTableRequest.builder().databaseName(dbName).name(ident.name());
+    applyCatalogId(catalogId, req::catalogId);
+    try {
+      glueClient.deleteTable(req.build());
+      LOG.info("Dropped Glue table {}.{}", dbName, ident.name());
+      return true;
+    } catch (EntityNotFoundException e) {
+      return false;
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toTableException(e, "table " + 
ident.name());
+    }
+  }
+
+  private static String schemaName(Namespace namespace) {
+    String[] levels = namespace.levels();
+    Preconditions.checkArgument(
+        levels.length >= 2, "Namespace must have at least 2 levels, got: %s", 
levels.length);
+    return levels[levels.length - 1];
+  }
+
+  // NOTE: parameter type is the Glue SDK Table, not GlueTable (our domain 
class).
+  // The Glue SDK's Column model is also referenced by FQN throughout this 
class because its
+  // simple name conflicts with the imported org.apache.gravitino.rel.Column.
+  private boolean matchesFormatFilter(Table table) {
+    if (tableFormatFilter == null) return true;
+    String fmt = table.hasParameters() ? 
table.parameters().get(GlueConstants.TABLE_FORMAT) : null;
+    String normalized =
+        fmt != null ? fmt.toLowerCase(Locale.ROOT) : 
GlueConstants.DEFAULT_TABLE_FORMAT_VALUE;
+    return tableFormatFilter.contains(normalized);
+  }
+
+  private TableInput buildTableInput(
+      String name,
+      String comment,
+      Column[] columns,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders) {
+
+    int partCount = partitions.length;
+    Preconditions.checkArgument(
+        columns.length >= partCount,
+        "columns.length (%s) must be >= number of partition columns (%s)",
+        columns.length,
+        partCount);
+    int dataCount = columns.length - partCount;
+
+    List<software.amazon.awssdk.services.glue.model.Column> glueDataCols = new 
ArrayList<>();
+    for (int i = 0; i < dataCount; i++) {
+      glueDataCols.add(toGlueColumn(columns[i]));
+    }
+
+    List<software.amazon.awssdk.services.glue.model.Column> gluePartCols = new 
ArrayList<>();
+    for (int i = dataCount; i < columns.length; i++) {
+      gluePartCols.add(toGlueColumn(columns[i]));
+    }
+
+    // Separate properties into: SD fields, table-level fields, and 
Table.parameters()
+    Map<String, String> serdeParams = new HashMap<>();
+    Map<String, String> tableParams = new HashMap<>();
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      String key = entry.getKey();
+      if (key.startsWith(GlueConstants.SERDE_PARAMETER_PREFIX)) {
+        serdeParams.put(
+            key.substring(GlueConstants.SERDE_PARAMETER_PREFIX.length()), 
entry.getValue());
+      } else if (!SD_TABLE_PROPERTY_KEYS.contains(key) && 
!TABLE_LEVEL_KEYS.contains(key)) {
+        tableParams.put(key, entry.getValue());
+      }
+    }
+
+    SerDeInfo serDe =
+        SerDeInfo.builder()
+            .serializationLibrary(properties.get(GlueConstants.SERDE_LIB))
+            .name(properties.get(GlueConstants.SERDE_NAME))
+            .parameters(serdeParams)
+            .build();
+
+    List<String> bucketCols = Collections.emptyList();
+    int numBuckets = 0;
+    if (distribution != null && distribution != Distributions.NONE) {
+      numBuckets = distribution.number();
+      bucketCols =
+          Arrays.stream(distribution.expressions())
+              .filter(e -> e instanceof NamedReference.FieldReference)
+              .map(e -> String.join(".", ((NamedReference.FieldReference) 
e).fieldName()))
+              .collect(Collectors.toList());
+    }
+
+    List<Order> glueSortCols = new ArrayList<>();
+    if (sortOrders != null) {
+      for (SortOrder so : sortOrders) {
+        if (so.expression() instanceof NamedReference.FieldReference) {
+          String colName =
+              String.join(".", ((NamedReference.FieldReference) 
so.expression()).fieldName());
+          int order = so.direction() == SortDirection.ASCENDING ? 1 : 0;
+          
glueSortCols.add(Order.builder().column(colName).sortOrder(order).build());
+        }
+      }
+    }
+
+    StorageDescriptor sd =
+        StorageDescriptor.builder()
+            .columns(glueDataCols)
+            .location(properties.get(GlueConstants.LOCATION))
+            .inputFormat(properties.get(GlueConstants.INPUT_FORMAT))
+            .outputFormat(properties.get(GlueConstants.OUTPUT_FORMAT))
+            .serdeInfo(serDe)
+            .bucketColumns(bucketCols)
+            .numberOfBuckets(numBuckets)
+            .sortColumns(glueSortCols)
+            .build();
+
+    return TableInput.builder()
+        .name(name)
+        .description(comment)
+        .tableType(properties.get(GlueConstants.TABLE_TYPE))
+        .parameters(tableParams)
+        .storageDescriptor(sd)
+        .partitionKeys(gluePartCols)
+        .build();
+  }
+
+  private software.amazon.awssdk.services.glue.model.Column 
toGlueColumn(Column col) {
+    return software.amazon.awssdk.services.glue.model.Column.builder()
+        .name(col.name())
+        .type(typeConverter.fromGravitino(col.dataType()))
+        .comment(col.comment())
+        .build();
+  }
+
+  private static void applyColumnChange(
+      List<Column> dataCols, List<Column> partCols, TableChange.ColumnChange 
change) {
+
+    String fieldName = change.fieldName()[0];
+
+    if (change instanceof TableChange.AddColumn) {
+      TableChange.AddColumn add = (TableChange.AddColumn) change;
+      dataCols.add(
+          GlueColumn.builder()
+              .withName(add.fieldName()[0])
+              .withType(add.getDataType())
+              .withComment(add.getComment())
+              .withNullable(add.isNullable())
+              .build());
+
+    } else if (change instanceof TableChange.DeleteColumn) {
+      boolean isPartition = partCols.stream().anyMatch(c -> 
c.name().equals(fieldName));
+      Preconditions.checkArgument(!isPartition, "Cannot delete partition 
column: %s", fieldName);
+      dataCols.removeIf(c -> c.name().equals(fieldName));
+
+    } else if (change instanceof TableChange.RenameColumn) {
+      boolean isPartition = partCols.stream().anyMatch(c -> 
c.name().equals(fieldName));
+      Preconditions.checkArgument(!isPartition, "Cannot rename partition 
column: %s", fieldName);
+      String newColName = ((TableChange.RenameColumn) change).getNewName();
+      replaceColumn(
+          dataCols, fieldName, old -> copyColumn(old, newColName, 
old.dataType(), old.comment()));
+
+    } else if (change instanceof TableChange.UpdateColumnType) {
+      boolean isPartition = partCols.stream().anyMatch(c -> 
c.name().equals(fieldName));
+      Preconditions.checkArgument(
+          !isPartition, "Cannot update type of partition column: %s", 
fieldName);
+      Type newType = ((TableChange.UpdateColumnType) change).getNewDataType();
+      replaceColumn(
+          dataCols, fieldName, old -> copyColumn(old, old.name(), newType, 
old.comment()));
+
+    } else if (change instanceof TableChange.UpdateColumnComment) {
+      String newCmt = ((TableChange.UpdateColumnComment) 
change).getNewComment();
+      if (!replaceColumn(
+          dataCols, fieldName, old -> copyColumn(old, old.name(), 
old.dataType(), newCmt))) {
+        replaceColumn(
+            partCols, fieldName, old -> copyColumn(old, old.name(), 
old.dataType(), newCmt));
+      }
+
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported column change: " + change.getClass().getSimpleName());
+    }
+  }
+
+  /** Passes {@code catalogId} to {@code setter} when it is non-null. */
+  static void applyCatalogId(String catalogId, Consumer<String> setter) {
+    if (catalogId != null) setter.accept(catalogId);
+  }
+
+  /**
+   * Finds the first column matching {@code name} in {@code cols}, replaces it 
with the result of
+   * {@code updater}, and returns {@code true} if a replacement was made.
+   */
+  private static boolean replaceColumn(
+      List<Column> cols, String name, UnaryOperator<Column> updater) {
+    for (int i = 0; i < cols.size(); i++) {
+      if (cols.get(i).name().equals(name)) {
+        cols.set(i, updater.apply(cols.get(i)));
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static Column copyColumn(Column src, String name, Type type, String 
comment) {
+    return GlueColumn.builder()
+        .withName(name)
+        .withType(type)
+        .withComment(comment)
+        .withNullable(src.nullable())
+        .build();
   }
 }
diff --git 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueExceptionConverter.java
 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueExceptionConverter.java
new file mode 100644
index 0000000000..7857875e0c
--- /dev/null
+++ 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueExceptionConverter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.glue;
+
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.InvalidInputException;
+
+/** Converts AWS Glue SDK exceptions to Gravitino exceptions. */
+final class GlueExceptionConverter {
+
+  private GlueExceptionConverter() {}
+
+  /**
+   * Converts a {@link GlueException} to the appropriate Gravitino schema 
exception.
+   *
+   * @param e the Glue exception to convert
+   * @param context description of the operation context for error messages
+   * @return a Gravitino or standard Java runtime exception
+   */
+  static RuntimeException toSchemaException(GlueException e, String context) {
+    if (e instanceof EntityNotFoundException) {
+      return new NoSuchSchemaException(e, "%s does not exist", context);
+    }
+    if (e instanceof AlreadyExistsException) {
+      return new SchemaAlreadyExistsException(e, "%s already exists", context);
+    }
+    if (e instanceof InvalidInputException) {
+      return new IllegalArgumentException(context + ": " + e.getMessage(), e);
+    }
+    return new RuntimeException("Glue error: " + context, e);
+  }
+
+  /**
+   * Converts a {@link GlueException} to the appropriate Gravitino table 
exception.
+   *
+   * @param e the Glue exception to convert
+   * @param context description of the operation context for error messages
+   * @return a Gravitino or standard Java runtime exception
+   */
+  static RuntimeException toTableException(GlueException e, String context) {
+    if (e instanceof EntityNotFoundException) {
+      return new NoSuchTableException(e, "%s does not exist", context);
+    }
+    if (e instanceof AlreadyExistsException) {
+      return new TableAlreadyExistsException(e, "%s already exists", context);
+    }
+    if (e instanceof InvalidInputException) {
+      return new IllegalArgumentException(context + ": " + e.getMessage(), e);
+    }
+    return new RuntimeException("Glue error: " + context, e);
+  }
+}
diff --git 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java
 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java
index d395caf23c..1134a5ad4f 100644
--- 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java
+++ 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java
@@ -26,7 +26,9 @@ import static 
org.apache.gravitino.catalog.glue.GlueConstants.SERDE_NAME;
 import static 
org.apache.gravitino.catalog.glue.GlueConstants.SERDE_PARAMETER_PREFIX;
 import static org.apache.gravitino.catalog.glue.GlueConstants.TABLE_TYPE;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +39,7 @@ import org.apache.gravitino.connector.BaseTable;
 import org.apache.gravitino.connector.TableOperations;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.SupportsPartitions;
 import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
@@ -45,6 +48,7 @@ import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.sorts.SortOrders;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import software.amazon.awssdk.services.glue.GlueClient;
 import software.amazon.awssdk.services.glue.model.StorageDescriptor;
 import software.amazon.awssdk.services.glue.model.Table;
 
@@ -58,12 +62,44 @@ import software.amazon.awssdk.services.glue.model.Table;
 @ToString
 public class GlueTable extends BaseTable {
 
+  /** Glue client injected after construction to support partition operations. 
*/
+  private GlueClient glueClient;
+
+  /** Nullable — when null, Glue uses the caller's AWS account ID. */
+  private String catalogId;
+
+  /** Database (schema) this table belongs to. */
+  private String dbName;
+
   private GlueTable() {}
 
+  /**
+   * Injects the Glue connection context required for partition operations.
+   *
+   * <p>Called by {@link GlueCatalogOperations} after constructing a table 
instance.
+   */
+  void initOpsContext(GlueClient glueClient, String catalogId, String dbName) {
+    this.glueClient = glueClient;
+    this.catalogId = catalogId;
+    this.dbName = dbName;
+  }
+
   @Override
   protected TableOperations newOps() {
-    throw new UnsupportedOperationException(
-        "Partition operations are not yet supported for GlueTable");
+    Preconditions.checkState(
+        glueClient != null,
+        "Partition operations require Glue context; call initOpsContext() 
first");
+    String[] partColNames =
+        Arrays.stream(partitioning)
+            .filter(t -> t instanceof Transforms.IdentityTransform)
+            .map(t -> ((Transforms.IdentityTransform) t).fieldName()[0])
+            .toArray(String[]::new);
+    return new GlueTableOperations(glueClient, catalogId, dbName, name, 
partColNames);
+  }
+
+  @Override
+  public SupportsPartitions supportPartitions() throws 
UnsupportedOperationException {
+    return (SupportsPartitions) ops();
   }
 
   /**
diff --git 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java
 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java
new file mode 100644
index 0000000000..4891932f29
--- /dev/null
+++ 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.glue;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.connector.TableOperations;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.rel.SupportsPartitions;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.partitions.IdentityPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+
+/**
+ * Table-level partition operations for the AWS Glue Data Catalog.
+ *
+ * <p>Implements {@link SupportsPartitions} for Hive-format 
identity-partitioned tables. Partition
+ * names follow the Hive convention: {@code col=val/col2=val2}.
+ *
+ * <p>Only {@link IdentityPartition} is supported because the Glue partition 
model is Hive-style
+ * key=value, which maps directly to identity partitions. Other partition 
types (bucket, truncate,
+ * etc.) have no equivalent representation in the Glue partition API.
+ */
+class GlueTableOperations implements TableOperations, SupportsPartitions {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GlueTableOperations.class);
+
+  private final GlueClient glueClient;
+  /** Nullable — when null, Glue uses the caller's AWS account ID. */
+  private final String catalogId;
+
+  private final String dbName;
+  private final String tableName;
+  /** Ordered partition column names, matching the table's {@code 
partitionKeys()} order. */
+  private final String[] partitionColNames;
+
+  GlueTableOperations(
+      GlueClient glueClient,
+      String catalogId,
+      String dbName,
+      String tableName,
+      String[] partitionColNames) {
+    this.glueClient = glueClient;
+    this.catalogId = catalogId;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.partitionColNames = partitionColNames;
+  }
+
+  @Override
+  public String[] listPartitionNames() {
+    List<String> names = new ArrayList<>();
+    String nextToken = null;
+    try {
+      do {
+        GetPartitionsRequest.Builder req =
+            
GetPartitionsRequest.builder().databaseName(dbName).tableName(tableName);
+        GlueCatalogOperations.applyCatalogId(catalogId, req::catalogId);
+        if (nextToken != null) req.nextToken(nextToken);
+        GetPartitionsResponse resp = glueClient.getPartitions(req.build());
+        for (software.amazon.awssdk.services.glue.model.Partition p : 
resp.partitions()) {
+          names.add(buildPartitionName(p.values()));
+        }
+        nextToken = resp.nextToken();
+      } while (nextToken != null);
+    } catch (GlueException e) {
+      throw new RuntimeException("Failed to list partitions for table " + 
tableName, e);
+    }
+    return names.toArray(new String[0]);
+  }
+
+  @Override
+  public Partition[] listPartitions() {
+    List<Partition> partitions = new ArrayList<>();
+    String nextToken = null;
+    try {
+      do {
+        GetPartitionsRequest.Builder req =
+            
GetPartitionsRequest.builder().databaseName(dbName).tableName(tableName);
+        GlueCatalogOperations.applyCatalogId(catalogId, req::catalogId);
+        if (nextToken != null) req.nextToken(nextToken);
+        GetPartitionsResponse resp = glueClient.getPartitions(req.build());
+        for (software.amazon.awssdk.services.glue.model.Partition p : 
resp.partitions()) {
+          partitions.add(toGravitinoPartition(p));
+        }
+        nextToken = resp.nextToken();
+      } while (nextToken != null);
+    } catch (GlueException e) {
+      throw new RuntimeException("Failed to list partitions for table " + 
tableName, e);
+    }
+    return partitions.toArray(new Partition[0]);
+  }
+
+  @Override
+  public Partition getPartition(String partitionName) throws 
NoSuchPartitionException {
+    List<String> values = parsePartitionName(partitionName);
+    GetPartitionRequest.Builder req =
+        GetPartitionRequest.builder()
+            .databaseName(dbName)
+            .tableName(tableName)
+            .partitionValues(values);
+    GlueCatalogOperations.applyCatalogId(catalogId, req::catalogId);
+    try {
+      return 
toGravitinoPartition(glueClient.getPartition(req.build()).partition());
+    } catch (EntityNotFoundException e) {
+      throw new NoSuchPartitionException(
+          e, "Partition %s does not exist in table %s", partitionName, 
tableName);
+    } catch (GlueException e) {
+      throw new RuntimeException("Failed to get partition " + partitionName, 
e);
+    }
+  }
+
+  @Override
+  public Partition addPartition(Partition partition) throws 
PartitionAlreadyExistsException {
+    Preconditions.checkArgument(
+        partition instanceof IdentityPartition, "Glue only supports identity 
partitions");
+    IdentityPartition ip = (IdentityPartition) partition;
+    Preconditions.checkArgument(
+        ip.values().length == partitionColNames.length,
+        "Partition values count (%s) must match partition columns count (%s)",
+        ip.values().length,
+        partitionColNames.length);
+
+    List<String> values = new ArrayList<>(ip.values().length);
+    for (Literal<?> v : ip.values()) {
+      values.add(v.value() != null ? v.value().toString() : null);
+    }
+
+    PartitionInput input =
+        PartitionInput.builder()
+            .values(values)
+            .parameters(partition.properties())
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .build();
+
+    CreatePartitionRequest.Builder req =
+        CreatePartitionRequest.builder()
+            .databaseName(dbName)
+            .tableName(tableName)
+            .partitionInput(input);
+    GlueCatalogOperations.applyCatalogId(catalogId, req::catalogId);
+
+    try {
+      glueClient.createPartition(req.build());
+    } catch (AlreadyExistsException e) {
+      throw new PartitionAlreadyExistsException(
+          e, "Partition %s already exists in table %s", partition.name(), 
tableName);
+    } catch (GlueException e) {
+      throw new RuntimeException("Failed to add partition " + 
partition.name(), e);
+    }
+
+    LOG.info("Added partition {} to {}.{}", partition.name(), dbName, 
tableName);
+    return Partitions.identity(
+        partition.name(), ip.fieldNames(), ip.values(), 
partition.properties());
+  }
+
+  @Override
+  public void close() {
+    // GlueClient lifecycle is managed by GlueCatalogOperations; nothing to 
close here.
+  }
+
+  @Override
+  public boolean dropPartition(String partitionName) {
+    List<String> values = parsePartitionName(partitionName);
+    DeletePartitionRequest.Builder req =
+        DeletePartitionRequest.builder()
+            .databaseName(dbName)
+            .tableName(tableName)
+            .partitionValues(values);
+    GlueCatalogOperations.applyCatalogId(catalogId, req::catalogId);
+    try {
+      glueClient.deletePartition(req.build());
+      LOG.info("Dropped partition {} from {}.{}", partitionName, dbName, 
tableName);
+      return true;
+    } catch (EntityNotFoundException e) {
+      return false;
+    } catch (GlueException e) {
+      throw new RuntimeException("Failed to drop partition " + partitionName, 
e);
+    }
+  }
+
+  /**
+   * Builds a Hive-style partition name (e.g. {@code 
dt=2024-01-01/country=us}) from an ordered list
+   * of Glue partition values.
+   */
+  private String buildPartitionName(List<String> values) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < partitionColNames.length && i < values.size(); i++) {
+      if (i > 0) sb.append('/');
+      sb.append(partitionColNames[i]).append('=').append(values.get(i));
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Parses a Hive-style partition name (e.g. {@code 
dt=2024-01-01/country=us}) into an ordered list
+   * of values, validating that the keys match the table's partition columns 
in order.
+   */
+  private List<String> parsePartitionName(String partitionName) {
+    String[] parts = partitionName.split("/");
+    Preconditions.checkArgument(
+        parts.length == partitionColNames.length,
+        "Partition name '%s' has %s segment(s) but table has %s partition 
column(s)",
+        partitionName,
+        parts.length,
+        partitionColNames.length);
+    List<String> values = new ArrayList<>(parts.length);
+    for (int i = 0; i < parts.length; i++) {
+      int eq = parts[i].indexOf('=');
+      Preconditions.checkArgument(
+          eq >= 0 && parts[i].substring(0, eq).equals(partitionColNames[i]),
+          "Partition segment '%s' does not match expected column '%s'",
+          parts[i],
+          partitionColNames[i]);
+      values.add(parts[i].substring(eq + 1));
+    }
+    return values;
+  }
+
+  private IdentityPartition toGravitinoPartition(
+      software.amazon.awssdk.services.glue.model.Partition gluePartition) {
+    List<String> values = gluePartition.values();
+    String name = buildPartitionName(values);
+
+    String[][] fieldNames = new String[partitionColNames.length][];
+    Literal<?>[] literals = new Literal<?>[partitionColNames.length];
+    for (int i = 0; i < partitionColNames.length; i++) {
+      fieldNames[i] = new String[] {partitionColNames[i]};
+      literals[i] = Literals.stringLiteral(i < values.size() ? values.get(i) : 
null);
+    }
+    Map<String, String> props =
+        gluePartition.hasParameters() ? gluePartition.parameters() : 
Collections.emptyMap();
+    return Partitions.identity(name, fieldNames, literals, props);
+  }
+}
diff --git 
a/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogSchema.java
 
b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogSchema.java
new file mode 100644
index 0000000000..487d120684
--- /dev/null
+++ 
b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogSchema.java
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.glue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+
+class TestGlueCatalogSchema {
+
+  private GlueCatalogOperations ops;
+  private GlueClient mockClient;
+
+  @BeforeEach
+  void setup() {
+    mockClient = mock(GlueClient.class);
+    ops = new GlueCatalogOperations();
+    ops.glueClient = mockClient;
+  }
+
+  // -------------------------------------------------------------------------
+  // listSchemas
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testListSchemas() {
+    Namespace ns = Namespace.of("metalake", "catalog");
+    Database db1 = Database.builder().name("db1").build();
+    Database db2 = Database.builder().name("db2").build();
+    Database db3 = Database.builder().name("db3").build();
+    Database db4 = Database.builder().name("db4").build();
+
+    when(mockClient.getDatabases(any(GetDatabasesRequest.class)))
+        .thenReturn(
+            GetDatabasesResponse.builder().databaseList(db1, 
db2).nextToken("token1").build())
+        .thenReturn(GetDatabasesResponse.builder().databaseList(db3, 
db4).nextToken(null).build());
+
+    NameIdentifier[] result = ops.listSchemas(ns);
+
+    assertEquals(4, result.length);
+    assertEquals("db1", result[0].name());
+    assertEquals("db4", result[3].name());
+  }
+
+  @Test
+  void testListSchemasReturnsEmptyArray() {
+    Namespace ns = Namespace.of("metalake", "catalog");
+    when(mockClient.getDatabases(any(GetDatabasesRequest.class)))
+        .thenReturn(
+            GetDatabasesResponse.builder()
+                .databaseList(Collections.emptyList())
+                .nextToken(null)
+                .build());
+
+    assertEquals(0, ops.listSchemas(ns).length);
+  }
+
+  // -------------------------------------------------------------------------
+  // createSchema
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testCreateSchema() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Map<String, String> props = Map.of("k", "v");
+
+    GlueSchema schema = ops.createSchema(ident, "my comment", props);
+
+    verify(mockClient).createDatabase(any(CreateDatabaseRequest.class));
+    assertEquals("mydb", schema.name());
+    assertEquals("my comment", schema.comment());
+    assertEquals(props, schema.properties());
+  }
+
+  @Test
+  void testCreateSchemaAlreadyExists() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    when(mockClient.createDatabase(any(CreateDatabaseRequest.class)))
+        .thenThrow(AlreadyExistsException.builder().message("exists").build());
+
+    assertThrows(
+        SchemaAlreadyExistsException.class,
+        () -> ops.createSchema(ident, null, Collections.emptyMap()));
+  }
+
+  // -------------------------------------------------------------------------
+  // loadSchema
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testLoadSchema() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Database db =
+        Database.builder()
+            .name("mydb")
+            .description("desc")
+            .parameters(Map.of("k", "v"))
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenReturn(GetDatabaseResponse.builder().database(db).build());
+
+    GlueSchema schema = ops.loadSchema(ident);
+
+    assertEquals("mydb", schema.name());
+    assertEquals("desc", schema.comment());
+    assertEquals(Map.of("k", "v"), schema.properties());
+  }
+
+  @Test
+  void testLoadSchemaNotFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "missing");
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(NoSuchSchemaException.class, () -> ops.loadSchema(ident));
+  }
+
+  // -------------------------------------------------------------------------
+  // alterSchema
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testAlterSchemaSetProperty() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Database db =
+        Database.builder()
+            .name("mydb")
+            .parameters(Map.of("existing", "val"))
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenReturn(GetDatabaseResponse.builder().database(db).build());
+    when(mockClient.updateDatabase(any(UpdateDatabaseRequest.class)))
+        .thenReturn(UpdateDatabaseResponse.builder().build());
+
+    ArgumentCaptor<UpdateDatabaseRequest> captor =
+        ArgumentCaptor.forClass(UpdateDatabaseRequest.class);
+
+    GlueSchema result = ops.alterSchema(ident, 
SchemaChange.setProperty("newKey", "newVal"));
+
+    verify(mockClient).updateDatabase(captor.capture());
+    Map<String, String> sentParams = 
captor.getValue().databaseInput().parameters();
+    assertEquals("val", sentParams.get("existing"));
+    assertEquals("newVal", sentParams.get("newKey"));
+    assertEquals("newVal", result.properties().get("newKey"));
+  }
+
+  @Test
+  void testAlterSchemaRemoveProperty() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Database db =
+        Database.builder()
+            .name("mydb")
+            .parameters(Map.of("toRemove", "v"))
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenReturn(GetDatabaseResponse.builder().database(db).build());
+    when(mockClient.updateDatabase(any(UpdateDatabaseRequest.class)))
+        .thenReturn(UpdateDatabaseResponse.builder().build());
+
+    ArgumentCaptor<UpdateDatabaseRequest> captor =
+        ArgumentCaptor.forClass(UpdateDatabaseRequest.class);
+
+    ops.alterSchema(ident, SchemaChange.removeProperty("toRemove"));
+
+    verify(mockClient).updateDatabase(captor.capture());
+    
assertFalse(captor.getValue().databaseInput().parameters().containsKey("toRemove"));
+  }
+
+  @Test
+  void testAlterSchemaUnsupportedChange() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Database db =
+        
Database.builder().name("mydb").parameters(Map.of()).createTime(Instant.now()).build();
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenReturn(GetDatabaseResponse.builder().database(db).build());
+
+    SchemaChange unsupported = mock(SchemaChange.class);
+
+    assertThrows(IllegalArgumentException.class, () -> ops.alterSchema(ident, 
unsupported));
+  }
+
+  @Test
+  void testAlterSchemaNotFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "missing");
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(
+        NoSuchSchemaException.class,
+        () -> ops.alterSchema(ident, SchemaChange.setProperty("k", "v")));
+  }
+
+  // -------------------------------------------------------------------------
+  // dropSchema
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testDropSchema() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenReturn(GetTablesResponse.builder().tableList(List.of()).build());
+
+    boolean dropped = ops.dropSchema(ident, false);
+
+    verify(mockClient).deleteDatabase(any(DeleteDatabaseRequest.class));
+    assertTrue(dropped);
+  }
+
+  @Test
+  void testDropSchemaNonEmpty() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Table t = Table.builder().name("t1").build();
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenReturn(GetTablesResponse.builder().tableList(t).build());
+
+    assertThrows(NonEmptySchemaException.class, () -> ops.dropSchema(ident, 
false));
+    verify(mockClient, 
never()).deleteDatabase(any(DeleteDatabaseRequest.class));
+  }
+
+  @Test
+  void testDropSchemaCascadeTrue() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+
+    boolean dropped = ops.dropSchema(ident, true);
+
+    verify(mockClient, never()).getTables(any(GetTablesRequest.class));
+    verify(mockClient).deleteDatabase(any(DeleteDatabaseRequest.class));
+    assertTrue(dropped);
+  }
+
+  @Test
+  void testDropSchemaNotFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "missing");
+    when(mockClient.deleteDatabase(any(DeleteDatabaseRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertFalse(ops.dropSchema(ident, true));
+  }
+
+  @Test
+  void testDropSchemaWithCatalogId() {
+    ops.catalogId = "123456789012";
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenReturn(GetTablesResponse.builder().tableList(List.of()).build());
+
+    ArgumentCaptor<DeleteDatabaseRequest> captor =
+        ArgumentCaptor.forClass(DeleteDatabaseRequest.class);
+
+    ops.dropSchema(ident, false);
+
+    verify(mockClient).deleteDatabase(captor.capture());
+    assertEquals("123456789012", captor.getValue().catalogId());
+  }
+
+  // -------------------------------------------------------------------------
+  // listTables
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testListTables() {
+    Namespace ns = Namespace.of("metalake", "catalog", "mydb");
+    Table t1 = Table.builder().name("t1").build();
+    Table t2 = Table.builder().name("t2").build();
+    Table t3 = Table.builder().name("t3").build();
+
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenReturn(GetTablesResponse.builder().tableList(t1, 
t2).nextToken("tok").build())
+        
.thenReturn(GetTablesResponse.builder().tableList(t3).nextToken(null).build());
+
+    NameIdentifier[] result = ops.listTables(ns);
+
+    assertEquals(3, result.length);
+    assertEquals("t1", result[0].name());
+    assertEquals("t3", result[2].name());
+  }
+
+  @Test
+  void testListTablesSchemaNotFound() {
+    Namespace ns = Namespace.of("metalake", "catalog", "missing");
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(NoSuchSchemaException.class, () -> ops.listTables(ns));
+  }
+
+  @Test
+  void testListTablesWithFormatFilter() {
+    ops.tableFormatFilter = java.util.Set.of("iceberg");
+    Namespace ns = Namespace.of("metalake", "catalog", "mydb");
+
+    Table icebergTable =
+        Table.builder()
+            .name("ice_tbl")
+            .parameters(Map.of(GlueConstants.TABLE_FORMAT, "ICEBERG"))
+            .build();
+    Table hiveTable = 
Table.builder().name("hive_tbl").parameters(Collections.emptyMap()).build();
+
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenReturn(
+            GetTablesResponse.builder().tableList(icebergTable, 
hiveTable).nextToken(null).build());
+
+    NameIdentifier[] result = ops.listTables(ns);
+
+    assertEquals(1, result.length);
+    assertEquals("ice_tbl", result[0].name());
+  }
+
+  // -------------------------------------------------------------------------
+  // loadTable
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testLoadTable() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"mytable");
+    Table glueTable =
+        Table.builder()
+            .name("mytable")
+            .description("desc")
+            .storageDescriptor(
+                StorageDescriptor.builder()
+                    .columns(
+                        
software.amazon.awssdk.services.glue.model.Column.builder()
+                            .name("id")
+                            .type("bigint")
+                            .build())
+                    .build())
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenReturn(GetTableResponse.builder().table(glueTable).build());
+
+    GlueTable result = ops.loadTable(ident);
+
+    assertEquals("mytable", result.name());
+    assertEquals("desc", result.comment());
+    assertEquals(1, result.columns().length);
+    assertEquals("id", result.columns()[0].name());
+  }
+
+  @Test
+  void testLoadTableNotFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"missing");
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(NoSuchTableException.class, () -> ops.loadTable(ident));
+  }
+
+  // -------------------------------------------------------------------------
+  // createTable
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testCreateTable() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"mytable");
+    Column[] columns = {
+      
GlueColumn.builder().withName("id").withType(Types.LongType.get()).withNullable(true).build(),
+      GlueColumn.builder()
+          .withName("name")
+          .withType(Types.StringType.get())
+          .withNullable(true)
+          .build()
+    };
+
+    GlueTable result =
+        ops.createTable(
+            ident,
+            columns,
+            "my comment",
+            Map.of(GlueConstants.LOCATION, "s3://bucket/path"),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            SortOrders.NONE,
+            Indexes.EMPTY_INDEXES);
+
+    verify(mockClient).createTable(any(CreateTableRequest.class));
+    assertEquals("mytable", result.name());
+    assertEquals("my comment", result.comment());
+    assertEquals(2, result.columns().length);
+  }
+
+  @Test
+  void testCreateTableAlreadyExists() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"mytable");
+    when(mockClient.createTable(any(CreateTableRequest.class)))
+        .thenThrow(AlreadyExistsException.builder().message("exists").build());
+
+    assertThrows(
+        TableAlreadyExistsException.class,
+        () ->
+            ops.createTable(
+                ident,
+                new Column[0],
+                null,
+                Collections.emptyMap(),
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                SortOrders.NONE,
+                Indexes.EMPTY_INDEXES));
+  }
+
+  @Test
+  void testCreateTableRejectsIndexes() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"mytable");
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.createTable(
+                ident,
+                new Column[0],
+                null,
+                Collections.emptyMap(),
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                SortOrders.NONE,
+                new org.apache.gravitino.rel.indexes.Index[] {
+                  mock(org.apache.gravitino.rel.indexes.Index.class)
+                }));
+  }
+
+  @Test
+  void testCreateTableStorageDescriptorProperties() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"mytable");
+    Map<String, String> props =
+        Map.of(
+            GlueConstants.LOCATION, "s3://my-bucket/path",
+            GlueConstants.INPUT_FORMAT, 
"org.apache.hadoop.mapred.TextInputFormat",
+            GlueConstants.OUTPUT_FORMAT,
+                "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
+            GlueConstants.TABLE_TYPE, "EXTERNAL_TABLE");
+
+    ArgumentCaptor<CreateTableRequest> captor = 
ArgumentCaptor.forClass(CreateTableRequest.class);
+
+    ops.createTable(
+        ident,
+        new Column[0],
+        "comment",
+        props,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        SortOrders.NONE,
+        Indexes.EMPTY_INDEXES);
+
+    verify(mockClient).createTable(captor.capture());
+    CreateTableRequest req = captor.getValue();
+    assertEquals("EXTERNAL_TABLE", req.tableInput().tableType());
+    assertEquals("s3://my-bucket/path", 
req.tableInput().storageDescriptor().location());
+    
assertFalse(req.tableInput().parameters().containsKey(GlueConstants.LOCATION));
+    
assertFalse(req.tableInput().parameters().containsKey(GlueConstants.TABLE_TYPE));
+  }
+
+  // -------------------------------------------------------------------------
+  // alterTable
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testAlterTable() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"old");
+    Table glueTable =
+        Table.builder()
+            .name("old")
+            .description("old comment")
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenReturn(GetTableResponse.builder().table(glueTable).build());
+    when(mockClient.updateTable(any(UpdateTableRequest.class)))
+        .thenReturn(UpdateTableResponse.builder().build());
+
+    ArgumentCaptor<UpdateTableRequest> captor = 
ArgumentCaptor.forClass(UpdateTableRequest.class);
+
+    GlueTable result =
+        ops.alterTable(ident, TableChange.rename("new"), 
TableChange.updateComment("new comment"));
+
+    verify(mockClient).updateTable(captor.capture());
+    assertEquals("new", captor.getValue().tableInput().name());
+    assertEquals("new comment", result.comment());
+  }
+
+  @Test
+  void testAlterTableSetProperty() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"t");
+    Table glueTable =
+        Table.builder()
+            .name("t")
+            .parameters(Map.of("existing", "v1"))
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenReturn(GetTableResponse.builder().table(glueTable).build());
+    when(mockClient.updateTable(any(UpdateTableRequest.class)))
+        .thenReturn(UpdateTableResponse.builder().build());
+
+    GlueTable result = ops.alterTable(ident, TableChange.setProperty("newKey", 
"newVal"));
+
+    assertEquals("newVal", result.properties().get("newKey"));
+    assertEquals("v1", result.properties().get("existing"));
+  }
+
+  @Test
+  void testAlterTableAddColumn() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"t");
+    Table glueTable =
+        Table.builder()
+            .name("t")
+            .storageDescriptor(
+                StorageDescriptor.builder()
+                    .columns(
+                        List.of(
+                            
software.amazon.awssdk.services.glue.model.Column.builder()
+                                .name("id")
+                                .type("bigint")
+                                .build()))
+                    .build())
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenReturn(GetTableResponse.builder().table(glueTable).build());
+    when(mockClient.updateTable(any(UpdateTableRequest.class)))
+        .thenReturn(UpdateTableResponse.builder().build());
+
+    GlueTable result =
+        ops.alterTable(
+            ident, TableChange.addColumn(new String[] {"email"}, 
Types.StringType.get()));
+
+    assertEquals(2, result.columns().length);
+    assertEquals("email", result.columns()[1].name());
+  }
+
+  @Test
+  void testAlterTableNotFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"missing");
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(
+        NoSuchTableException.class, () -> ops.alterTable(ident, 
TableChange.updateComment("x")));
+  }
+
+  // -------------------------------------------------------------------------
+  // dropTable
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testDropTable() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"t");
+
+    assertTrue(ops.dropTable(ident));
+    verify(mockClient).deleteTable(any(DeleteTableRequest.class));
+  }
+
+  @Test
+  void testDropTableNotFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"missing");
+    when(mockClient.deleteTable(any(DeleteTableRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertFalse(ops.dropTable(ident));
+  }
+
+  @Test
+  void testDropTableWithCatalogId() {
+    ops.catalogId = "123456789012";
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"t");
+    ArgumentCaptor<DeleteTableRequest> captor = 
ArgumentCaptor.forClass(DeleteTableRequest.class);
+
+    ops.dropTable(ident);
+
+    verify(mockClient).deleteTable(captor.capture());
+    assertEquals("123456789012", captor.getValue().catalogId());
+    assertEquals("mydb", captor.getValue().databaseName());
+    assertEquals("t", captor.getValue().name());
+  }
+}
diff --git 
a/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueTableOperations.java
 
b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueTableOperations.java
new file mode 100644
index 0000000000..14e294eef5
--- /dev/null
+++ 
b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueTableOperations.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.glue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.partitions.IdentityPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+
+class TestGlueTableOperations {
+
+  private GlueClient mockClient;
+  private GlueTableOperations ops;
+
+  @BeforeEach
+  void setup() {
+    mockClient = mock(GlueClient.class);
+    ops =
+        new GlueTableOperations(mockClient, null, "mydb", "mytable", new 
String[] {"dt", "region"});
+  }
+
+  // -------------------------------------------------------------------------
+  // listPartitionNames
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testListPartitionNames() {
+    software.amazon.awssdk.services.glue.model.Partition p1 =
+        software.amazon.awssdk.services.glue.model.Partition.builder()
+            .values("2024-01-01", "us")
+            .build();
+    software.amazon.awssdk.services.glue.model.Partition p2 =
+        software.amazon.awssdk.services.glue.model.Partition.builder()
+            .values("2024-01-02", "eu")
+            .build();
+
+    when(mockClient.getPartitions(any(GetPartitionsRequest.class)))
+        
.thenReturn(GetPartitionsResponse.builder().partitions(p1).nextToken("tok").build())
+        
.thenReturn(GetPartitionsResponse.builder().partitions(p2).nextToken(null).build());
+
+    String[] names = ops.listPartitionNames();
+
+    assertEquals(2, names.length);
+    assertEquals("dt=2024-01-01/region=us", names[0]);
+    assertEquals("dt=2024-01-02/region=eu", names[1]);
+  }
+
+  @Test
+  void testListPartitionNamesReturnsEmptyArray() {
+    when(mockClient.getPartitions(any(GetPartitionsRequest.class)))
+        
.thenReturn(GetPartitionsResponse.builder().partitions(List.of()).nextToken(null).build());
+
+    assertEquals(0, ops.listPartitionNames().length);
+  }
+
+  // -------------------------------------------------------------------------
+  // listPartitions
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testListPartitions() {
+    software.amazon.awssdk.services.glue.model.Partition p =
+        software.amazon.awssdk.services.glue.model.Partition.builder()
+            .values("2024-01-01", "us")
+            .build();
+
+    when(mockClient.getPartitions(any(GetPartitionsRequest.class)))
+        
.thenReturn(GetPartitionsResponse.builder().partitions(p).nextToken(null).build());
+
+    Partition[] partitions = ops.listPartitions();
+
+    assertEquals(1, partitions.length);
+    IdentityPartition ip = (IdentityPartition) partitions[0];
+    assertEquals("dt=2024-01-01/region=us", ip.name());
+    assertEquals("2024-01-01", ip.values()[0].value());
+    assertEquals("us", ip.values()[1].value());
+  }
+
+  // -------------------------------------------------------------------------
+  // getPartition
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testGetPartition() {
+    software.amazon.awssdk.services.glue.model.Partition gluePartition =
+        software.amazon.awssdk.services.glue.model.Partition.builder()
+            .values("2024-01-01", "us")
+            .build();
+
+    ArgumentCaptor<GetPartitionRequest> captor = 
ArgumentCaptor.forClass(GetPartitionRequest.class);
+    when(mockClient.getPartition(any(GetPartitionRequest.class)))
+        
.thenReturn(GetPartitionResponse.builder().partition(gluePartition).build());
+
+    Partition result = ops.getPartition("dt=2024-01-01/region=us");
+
+    verify(mockClient).getPartition(captor.capture());
+    assertEquals(List.of("2024-01-01", "us"), 
captor.getValue().partitionValues());
+    assertEquals("dt=2024-01-01/region=us", result.name());
+  }
+
+  @Test
+  void testGetPartitionNotFound() {
+    when(mockClient.getPartition(any(GetPartitionRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(NoSuchPartitionException.class, () -> 
ops.getPartition("dt=2024-01-01/region=us"));
+  }
+
+  // -------------------------------------------------------------------------
+  // addPartition
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testAddPartition() {
+    IdentityPartition partition =
+        Partitions.identity(
+            "dt=2024-01-01/region=us",
+            new String[][] {{"dt"}, {"region"}},
+            new org.apache.gravitino.rel.expressions.literals.Literal[] {
+              Literals.stringLiteral("2024-01-01"), 
Literals.stringLiteral("us")
+            },
+            java.util.Collections.emptyMap());
+
+    ArgumentCaptor<CreatePartitionRequest> captor =
+        ArgumentCaptor.forClass(CreatePartitionRequest.class);
+
+    Partition result = ops.addPartition(partition);
+
+    verify(mockClient).createPartition(captor.capture());
+    assertEquals(List.of("2024-01-01", "us"), 
captor.getValue().partitionInput().values());
+    assertEquals("dt=2024-01-01/region=us", result.name());
+  }
+
+  @Test
+  void testAddPartitionAlreadyExists() {
+    IdentityPartition partition =
+        Partitions.identity(
+            "dt=2024-01-01/region=us",
+            new String[][] {{"dt"}, {"region"}},
+            new org.apache.gravitino.rel.expressions.literals.Literal[] {
+              Literals.stringLiteral("2024-01-01"), 
Literals.stringLiteral("us")
+            },
+            java.util.Collections.emptyMap());
+
+    when(mockClient.createPartition(any(CreatePartitionRequest.class)))
+        .thenThrow(AlreadyExistsException.builder().message("exists").build());
+
+    assertThrows(PartitionAlreadyExistsException.class, () -> 
ops.addPartition(partition));
+  }
+
+  @Test
+  void testAddPartitionRejectsNonIdentity() {
+    Partition nonIdentity = mock(Partition.class);
+    assertThrows(IllegalArgumentException.class, () -> 
ops.addPartition(nonIdentity));
+  }
+
+  // -------------------------------------------------------------------------
+  // dropPartition
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testDropPartition() {
+    ArgumentCaptor<DeletePartitionRequest> captor =
+        ArgumentCaptor.forClass(DeletePartitionRequest.class);
+
+    boolean result = ops.dropPartition("dt=2024-01-01/region=us");
+
+    verify(mockClient).deletePartition(captor.capture());
+    assertTrue(result);
+    assertEquals(List.of("2024-01-01", "us"), 
captor.getValue().partitionValues());
+    assertEquals("mydb", captor.getValue().databaseName());
+    assertEquals("mytable", captor.getValue().tableName());
+  }
+
+  @Test
+  void testDropPartitionNotFound() {
+    when(mockClient.deletePartition(any(DeletePartitionRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertFalse(ops.dropPartition("dt=2024-01-01/region=us"));
+  }
+
+  @Test
+  void testDropPartitionWithCatalogId() {
+    GlueTableOperations opsWithId =
+        new GlueTableOperations(mockClient, "123456789012", "mydb", "mytable", 
new String[] {"dt"});
+    ArgumentCaptor<DeletePartitionRequest> captor =
+        ArgumentCaptor.forClass(DeletePartitionRequest.class);
+
+    opsWithId.dropPartition("dt=2024-01-01");
+
+    verify(mockClient).deletePartition(captor.capture());
+    assertEquals("123456789012", captor.getValue().catalogId());
+  }
+}

Reply via email to