This is an automated email from the ASF dual-hosted git repository.

diqiu50 pushed a commit to branch glue-pr04
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 7290b412fb11579ecc05f75fa814e90e55817508
Author: diqiu50 <[email protected]>
AuthorDate: Tue Apr 21 16:09:36 2026 +0800

    feat(catalog-glue): implement schema/table CRUD, partition support, and 
exception mapping
---
 .../catalog/glue/GlueCatalogOperations.java        | 651 ++++++++++++++++++++-
 .../catalog/glue/GlueExceptionConverter.java       |  74 +++
 .../apache/gravitino/catalog/glue/GlueTable.java   |  41 +-
 .../catalog/glue/GlueTableOperations.java          | 249 ++++++++
 .../catalog/glue/TestGlueCatalogOperations.java    | 314 ++++++++++
 .../glue/TestGlueCatalogOperationsTable.java       | 399 +++++++++++++
 .../catalog/glue/TestGlueTableOperations.java      | 235 ++++++++
 7 files changed, 1950 insertions(+), 13 deletions(-)

diff --git 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java
 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java
index 22a2175f7b..3aea48e7b6 100644
--- 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java
+++ 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java
@@ -18,32 +18,121 @@
  */
 package org.apache.gravitino.catalog.glue;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.ConnectionFailedException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortDirection;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.Order;
+import software.amazon.awssdk.services.glue.model.SerDeInfo;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
 
 /**
  * Operations implementation for the AWS Glue Data Catalog connector.
  *
- * <p>This is a scaffold stub. Full implementation will be added in PR-02 
through PR-06.
- *
- * <p>TODO PR-04: implement SupportsSchemas (Schema CRUD + exception mapping)
- *
- * <p>TODO PR-05: implement TableCatalog (Table CRUD + type detection)
+ * <p>Implements schema CRUD (via {@link SupportsSchemas}) and table CRUD (via 
{@link TableCatalog})
+ * backed by the AWS Glue API.
  */
-public class GlueCatalogOperations implements CatalogOperations {
+public class GlueCatalogOperations implements CatalogOperations, 
SupportsSchemas, TableCatalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalogOperations.class);
+
+  /** Table property keys that map to StorageDescriptor fields, not to 
Table.parameters(). */
+  private static final Set<String> SD_TABLE_PROPERTY_KEYS =
+      ImmutableSet.of(
+          GlueConstants.LOCATION,
+          GlueConstants.INPUT_FORMAT,
+          GlueConstants.OUTPUT_FORMAT,
+          GlueConstants.SERDE_LIB,
+          GlueConstants.SERDE_NAME);
+
+  /** Property keys that map to top-level TableInput fields, not to 
Table.parameters(). */
+  private static final Set<String> TABLE_LEVEL_KEYS = 
ImmutableSet.of(GlueConstants.TABLE_TYPE);
+
+  @VisibleForTesting GlueClient glueClient;
+
+  /** Nullable — when null, Glue uses the caller's AWS account ID. */
+  @VisibleForTesting String catalogId;
 
-  // TODO PR-04: add GlueClient field and catalogId
+  /** Nullable — when null all table formats are exposed. */
+  @VisibleForTesting Set<String> tableFormatFilter;
 
   @Override
   public void initialize(
       Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
       throws RuntimeException {
-    // TODO PR-04: build GlueClient via GlueClientProvider and store catalogId
+    this.glueClient = GlueClientProvider.buildClient(config);
+    this.catalogId = config.get(GlueConstants.AWS_GLUE_CATALOG_ID);
+    String filterProp =
+        config.getOrDefault(
+            GlueConstants.TABLE_FORMAT_FILTER, 
GlueConstants.DEFAULT_TABLE_FORMAT_FILTER);
+    if 
(!GlueConstants.DEFAULT_TABLE_FORMAT_FILTER.equalsIgnoreCase(filterProp)) {
+      tableFormatFilter =
+          Arrays.stream(filterProp.split(","))
+              .map(String::trim)
+              .map(s -> s.toLowerCase(Locale.ROOT))
+              .collect(Collectors.toSet());
+    }
   }
 
   @Override
@@ -54,11 +143,553 @@ public class GlueCatalogOperations implements 
CatalogOperations {
       String comment,
       Map<String, String> properties)
       throws Exception {
-    // TODO PR-04: call GlueClient.getDatabases() to verify connectivity
+    try {
+      GetDatabasesRequest.Builder req = 
GetDatabasesRequest.builder().maxResults(1);
+      applyCatalogId(req::catalogId);
+      glueClient.getDatabases(req.build());
+    } catch (GlueException e) {
+      throw new ConnectionFailedException(e, "Failed to connect to AWS Glue: 
%s", e.getMessage());
+    }
   }
 
   @Override
   public void close() throws IOException {
-    // TODO PR-04: close GlueClient
+    if (glueClient != null) {
+      glueClient.close();
+      glueClient = null;
+    }
+  }
+
+  @Override
+  public NameIdentifier[] listSchemas(Namespace namespace) throws 
NoSuchCatalogException {
+    List<NameIdentifier> result = new ArrayList<>();
+    String nextToken = null;
+    try {
+      do {
+        GetDatabasesRequest.Builder req = GetDatabasesRequest.builder();
+        applyCatalogId(req::catalogId);
+        if (nextToken != null) req.nextToken(nextToken);
+        GetDatabasesResponse resp = glueClient.getDatabases(req.build());
+        resp.databaseList().stream()
+            .map(db -> NameIdentifier.of(namespace, db.name()))
+            .forEach(result::add);
+        nextToken = resp.nextToken();
+      } while (nextToken != null);
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "listing schemas under 
" + namespace);
+    }
+    return result.toArray(new NameIdentifier[0]);
+  }
+
+  @Override
+  public GlueSchema createSchema(
+      NameIdentifier ident, String comment, Map<String, String> properties)
+      throws NoSuchCatalogException, SchemaAlreadyExistsException {
+
+    Map<String, String> params = properties != null ? properties : 
Collections.emptyMap();
+
+    DatabaseInput input =
+        
DatabaseInput.builder().name(ident.name()).description(comment).parameters(params).build();
+
+    CreateDatabaseRequest.Builder req = 
CreateDatabaseRequest.builder().databaseInput(input);
+    applyCatalogId(req::catalogId);
+
+    try {
+      glueClient.createDatabase(req.build());
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "schema " + 
ident.name());
+    }
+
+    LOG.info("Created Glue schema (database) {}", ident.name());
+
+    return GlueSchema.builder()
+        .withName(ident.name())
+        .withComment(comment)
+        .withProperties(params)
+        .withAuditInfo(
+            AuditInfo.builder()
+                .withCreator(PrincipalUtils.getCurrentUserName())
+                .withCreateTime(Instant.now())
+                .build())
+        .build();
+  }
+
+  @Override
+  public GlueSchema loadSchema(NameIdentifier ident) throws 
NoSuchSchemaException {
+    GetDatabaseRequest.Builder req = 
GetDatabaseRequest.builder().name(ident.name());
+    applyCatalogId(req::catalogId);
+    try {
+      GlueSchema schema =
+          
GlueSchema.fromGlueDatabase(glueClient.getDatabase(req.build()).database());
+      LOG.info("Loaded Glue schema (database) {}", ident.name());
+      return schema;
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "schema " + 
ident.name());
+    }
+  }
+
+  @Override
+  public GlueSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
+      throws NoSuchSchemaException {
+
+    GlueSchema current = loadSchema(ident);
+
+    Map<String, String> newProps = new HashMap<>(current.properties());
+
+    for (SchemaChange change : changes) {
+      if (change instanceof SchemaChange.SetProperty) {
+        SchemaChange.SetProperty sp = (SchemaChange.SetProperty) change;
+        newProps.put(sp.getProperty(), sp.getValue());
+      } else if (change instanceof SchemaChange.RemoveProperty) {
+        newProps.remove(((SchemaChange.RemoveProperty) change).getProperty());
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported schema change: " + change.getClass().getSimpleName());
+      }
+    }
+
+    DatabaseInput input =
+        DatabaseInput.builder()
+            .name(ident.name())
+            .description(current.comment())
+            .parameters(newProps)
+            .build();
+
+    UpdateDatabaseRequest.Builder req =
+        
UpdateDatabaseRequest.builder().name(ident.name()).databaseInput(input);
+    applyCatalogId(req::catalogId);
+
+    try {
+      glueClient.updateDatabase(req.build());
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "schema " + 
ident.name());
+    }
+
+    LOG.info("Altered Glue schema (database) {}", ident.name());
+
+    return GlueSchema.builder()
+        .withName(ident.name())
+        .withComment(current.comment())
+        .withProperties(newProps)
+        .withAuditInfo(current.auditInfo())
+        .build();
+  }
+
+  @Override
+  public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
+    if (!cascade) {
+      GetTablesRequest.Builder tabReq =
+          GetTablesRequest.builder().databaseName(ident.name()).maxResults(1);
+      applyCatalogId(tabReq::catalogId);
+      try {
+        if (!glueClient.getTables(tabReq.build()).tableList().isEmpty()) {
+          throw new NonEmptySchemaException(
+              "Schema %s is not empty. Use cascade=true to drop it with its 
tables.", ident.name());
+        }
+      } catch (NonEmptySchemaException e) {
+        throw e;
+      } catch (GlueException e) {
+        throw GlueExceptionConverter.toSchemaException(
+            e, "checking tables in schema " + ident.name());
+      }
+    }
+
+    DeleteDatabaseRequest.Builder req = 
DeleteDatabaseRequest.builder().name(ident.name());
+    applyCatalogId(req::catalogId);
+    try {
+      glueClient.deleteDatabase(req.build());
+      LOG.info("Dropped Glue schema (database) {}", ident.name());
+      return true;
+    } catch (EntityNotFoundException e) {
+      return false;
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "schema " + 
ident.name());
+    }
+  }
+
+  @Override
+  public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
+    String dbName = schemaName(namespace);
+    List<NameIdentifier> result = new ArrayList<>();
+    String nextToken = null;
+    try {
+      do {
+        GetTablesRequest.Builder req = 
GetTablesRequest.builder().databaseName(dbName);
+        applyCatalogId(req::catalogId);
+        if (nextToken != null) req.nextToken(nextToken);
+        GetTablesResponse resp = glueClient.getTables(req.build());
+        resp.tableList().stream()
+            .filter(this::matchesFormatFilter)
+            .map(t -> NameIdentifier.of(namespace, t.name()))
+            .forEach(result::add);
+        nextToken = resp.nextToken();
+      } while (nextToken != null);
+    } catch (EntityNotFoundException e) {
+      throw new NoSuchSchemaException(e, "Schema %s does not exist", dbName);
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toSchemaException(e, "listing tables in 
schema " + dbName);
+    }
+    return result.toArray(new NameIdentifier[0]);
+  }
+
+  @Override
+  public GlueTable loadTable(NameIdentifier ident) throws NoSuchTableException 
{
+    String dbName = schemaName(ident.namespace());
+    GetTableRequest.Builder req = 
GetTableRequest.builder().databaseName(dbName).name(ident.name());
+    applyCatalogId(req::catalogId);
+    try {
+      GlueTable table = 
GlueTable.fromGlueTable(glueClient.getTable(req.build()).table());
+      table.initOpsContext(glueClient, catalogId, dbName);
+      LOG.info("Loaded Glue table {}.{}", dbName, ident.name());
+      return table;
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toTableException(e, "table " + 
ident.name());
+    }
+  }
+
+  @Override
+  public GlueTable createTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes)
+      throws NoSuchSchemaException, TableAlreadyExistsException {
+
+    Preconditions.checkArgument(indexes.length == 0, "Glue catalog does not 
support indexes");
+
+    for (Transform t : partitions) {
+      Preconditions.checkArgument(
+          t instanceof Transforms.IdentityTransform,
+          "Glue catalog only supports identity partitioning, got: %s",
+          t.name());
+      Preconditions.checkArgument(
+          ((Transforms.IdentityTransform) t).fieldName().length == 1,
+          "Glue catalog does not support nested field partitioning");
+    }
+
+    String dbName = schemaName(ident.namespace());
+    Map<String, String> props = properties != null ? properties : 
Collections.emptyMap();
+
+    TableInput input =
+        buildTableInput(
+            ident.name(), comment, columns, props, partitions, distribution, 
sortOrders);
+
+    CreateTableRequest.Builder req =
+        CreateTableRequest.builder().databaseName(dbName).tableInput(input);
+    applyCatalogId(req::catalogId);
+
+    try {
+      glueClient.createTable(req.build());
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toTableException(e, "table " + 
ident.name());
+    }
+
+    LOG.info("Created Glue table {}.{}", dbName, ident.name());
+
+    GlueTable created =
+        GlueTable.builder()
+            .withName(ident.name())
+            .withComment(comment)
+            .withColumns(columns)
+            .withProperties(props)
+            .withPartitioning(partitions)
+            .withDistribution(distribution != null ? distribution : 
Distributions.NONE)
+            .withSortOrders(sortOrders != null ? sortOrders : new SortOrder[0])
+            .withAuditInfo(
+                AuditInfo.builder()
+                    .withCreator(PrincipalUtils.getCurrentUserName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+    created.initOpsContext(glueClient, catalogId, dbName);
+    return created;
+  }
+
+  @Override
+  public GlueTable alterTable(NameIdentifier ident, TableChange... changes)
+      throws NoSuchTableException, IllegalArgumentException {
+
+    GlueTable current = loadTable(ident);
+    String dbName = schemaName(ident.namespace());
+
+    String newName = current.name();
+    String newComment = current.comment();
+    Map<String, String> newProps = new HashMap<>(current.properties());
+
+    // Separate data columns from partition columns
+    int partCount = current.partitioning().length;
+    List<Column> allCols = new ArrayList<>(Arrays.asList(current.columns()));
+    List<Column> dataCols = new ArrayList<>(allCols.subList(0, allCols.size() 
- partCount));
+    List<Column> partCols =
+        new ArrayList<>(allCols.subList(allCols.size() - partCount, 
allCols.size()));
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.RenameTable) {
+        newName = ((TableChange.RenameTable) change).getNewName();
+      } else if (change instanceof TableChange.UpdateComment) {
+        newComment = ((TableChange.UpdateComment) change).getNewComment();
+      } else if (change instanceof TableChange.SetProperty) {
+        TableChange.SetProperty sp = (TableChange.SetProperty) change;
+        newProps.put(sp.getProperty(), sp.getValue());
+      } else if (change instanceof TableChange.RemoveProperty) {
+        newProps.remove(((TableChange.RemoveProperty) change).getProperty());
+      } else if (change instanceof TableChange.ColumnChange) {
+        applyColumnChange(dataCols, partCols, (TableChange.ColumnChange) 
change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change: " + change.getClass().getSimpleName());
+      }
+    }
+
+    List<Column> newAllCols = new ArrayList<>(dataCols);
+    newAllCols.addAll(partCols);
+    Column[] newColumns = newAllCols.toArray(new Column[0]);
+
+    TableInput input =
+        buildTableInput(
+            newName,
+            newComment,
+            newColumns,
+            newProps,
+            current.partitioning(),
+            current.distribution(),
+            current.sortOrder());
+
+    UpdateTableRequest.Builder req =
+        UpdateTableRequest.builder().databaseName(dbName).tableInput(input);
+    applyCatalogId(req::catalogId);
+
+    try {
+      glueClient.updateTable(req.build());
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toTableException(e, "table " + 
ident.name());
+    }
+
+    LOG.info("Altered Glue table {}.{}", dbName, ident.name());
+
+    GlueTable altered =
+        GlueTable.builder()
+            .withName(newName)
+            .withComment(newComment)
+            .withColumns(newColumns)
+            .withProperties(newProps)
+            .withPartitioning(current.partitioning())
+            .withDistribution(current.distribution())
+            .withSortOrders(current.sortOrder())
+            .withAuditInfo(current.auditInfo())
+            .build();
+    altered.initOpsContext(glueClient, catalogId, dbName);
+    return altered;
+  }
+
+  @Override
+  public boolean dropTable(NameIdentifier ident) {
+    String dbName = schemaName(ident.namespace());
+    DeleteTableRequest.Builder req =
+        DeleteTableRequest.builder().databaseName(dbName).name(ident.name());
+    applyCatalogId(req::catalogId);
+    try {
+      glueClient.deleteTable(req.build());
+      LOG.info("Dropped Glue table {}.{}", dbName, ident.name());
+      return true;
+    } catch (EntityNotFoundException e) {
+      return false;
+    } catch (GlueException e) {
+      throw GlueExceptionConverter.toTableException(e, "table " + 
ident.name());
+    }
+  }
+
+  private static String schemaName(Namespace namespace) {
+    String[] levels = namespace.levels();
+    Preconditions.checkArgument(
+        levels.length >= 2, "Namespace must have at least 2 levels, got: %s", 
levels.length);
+    return levels[levels.length - 1];
+  }
+
+  // NOTE: parameter type is the Glue SDK Table, not GlueTable (our domain 
class).
+  // The Glue SDK's Column model is also referenced by FQN throughout this 
class because its
+  // simple name conflicts with the imported org.apache.gravitino.rel.Column.
+  private boolean matchesFormatFilter(Table table) {
+    if (tableFormatFilter == null) return true;
+    String fmt = table.hasParameters() ? 
table.parameters().get(GlueConstants.TABLE_FORMAT) : null;
+    String normalized =
+        fmt != null ? fmt.toLowerCase(Locale.ROOT) : 
GlueConstants.DEFAULT_TABLE_FORMAT_VALUE;
+    return tableFormatFilter.contains(normalized);
+  }
+
+  private TableInput buildTableInput(
+      String name,
+      String comment,
+      Column[] columns,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders) {
+
+    int partCount = partitions.length;
+    int dataCount = columns.length - partCount;
+
+    List<software.amazon.awssdk.services.glue.model.Column> glueDataCols = new 
ArrayList<>();
+    for (int i = 0; i < dataCount; i++) {
+      glueDataCols.add(toGlueColumn(columns[i]));
+    }
+
+    List<software.amazon.awssdk.services.glue.model.Column> gluePartCols = new 
ArrayList<>();
+    for (int i = dataCount; i < columns.length; i++) {
+      gluePartCols.add(toGlueColumn(columns[i]));
+    }
+
+    // Separate properties into: SD fields, table-level fields, and 
Table.parameters()
+    Map<String, String> serdeParams = new HashMap<>();
+    Map<String, String> tableParams = new HashMap<>();
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      String key = entry.getKey();
+      if (key.startsWith(GlueConstants.SERDE_PARAMETER_PREFIX)) {
+        serdeParams.put(
+            key.substring(GlueConstants.SERDE_PARAMETER_PREFIX.length()), 
entry.getValue());
+      } else if (!SD_TABLE_PROPERTY_KEYS.contains(key) && 
!TABLE_LEVEL_KEYS.contains(key)) {
+        tableParams.put(key, entry.getValue());
+      }
+    }
+
+    SerDeInfo serDe =
+        SerDeInfo.builder()
+            .serializationLibrary(properties.get(GlueConstants.SERDE_LIB))
+            .name(properties.get(GlueConstants.SERDE_NAME))
+            .parameters(serdeParams)
+            .build();
+
+    List<String> bucketCols = Collections.emptyList();
+    int numBuckets = 0;
+    if (distribution != null && distribution != Distributions.NONE && 
distribution.number() > 0) {
+      numBuckets = distribution.number();
+      bucketCols =
+          Arrays.stream(distribution.expressions())
+              .filter(e -> e instanceof NamedReference.FieldReference)
+              .map(e -> String.join(".", ((NamedReference.FieldReference) 
e).fieldName()))
+              .collect(Collectors.toList());
+    }
+
+    List<Order> glueSortCols = new ArrayList<>();
+    if (sortOrders != null) {
+      for (SortOrder so : sortOrders) {
+        if (so.expression() instanceof NamedReference.FieldReference) {
+          String colName =
+              String.join(".", ((NamedReference.FieldReference) 
so.expression()).fieldName());
+          int order = so.direction() == SortDirection.ASCENDING ? 1 : 0;
+          
glueSortCols.add(Order.builder().column(colName).sortOrder(order).build());
+        }
+      }
+    }
+
+    StorageDescriptor sd =
+        StorageDescriptor.builder()
+            .columns(glueDataCols)
+            .location(properties.get(GlueConstants.LOCATION))
+            .inputFormat(properties.get(GlueConstants.INPUT_FORMAT))
+            .outputFormat(properties.get(GlueConstants.OUTPUT_FORMAT))
+            .serdeInfo(serDe)
+            .bucketColumns(bucketCols)
+            .numberOfBuckets(numBuckets)
+            .sortColumns(glueSortCols)
+            .build();
+
+    return TableInput.builder()
+        .name(name)
+        .description(comment)
+        .tableType(properties.get(GlueConstants.TABLE_TYPE))
+        .parameters(tableParams)
+        .storageDescriptor(sd)
+        .partitionKeys(gluePartCols)
+        .build();
+  }
+
+  private static software.amazon.awssdk.services.glue.model.Column 
toGlueColumn(Column col) {
+    return software.amazon.awssdk.services.glue.model.Column.builder()
+        .name(col.name())
+        .type(GlueTypeConverter.fromGravitino(col.dataType()))
+        .comment(col.comment())
+        .build();
+  }
+
+  private static void applyColumnChange(
+      List<Column> dataCols, List<Column> partCols, TableChange.ColumnChange 
change) {
+
+    String fieldName = change.fieldName()[0];
+
+    if (change instanceof TableChange.AddColumn) {
+      TableChange.AddColumn add = (TableChange.AddColumn) change;
+      dataCols.add(
+          GlueColumn.builder()
+              .withName(add.fieldName()[0])
+              .withType(add.getDataType())
+              .withComment(add.getComment())
+              .withNullable(add.isNullable())
+              .build());
+
+    } else if (change instanceof TableChange.DeleteColumn) {
+      boolean isPartition = partCols.stream().anyMatch(c -> 
c.name().equals(fieldName));
+      Preconditions.checkArgument(!isPartition, "Cannot delete partition 
column: %s", fieldName);
+      dataCols.removeIf(c -> c.name().equals(fieldName));
+
+    } else if (change instanceof TableChange.RenameColumn) {
+      boolean isPartition = partCols.stream().anyMatch(c -> 
c.name().equals(fieldName));
+      Preconditions.checkArgument(!isPartition, "Cannot rename partition 
column: %s", fieldName);
+      String newColName = ((TableChange.RenameColumn) change).getNewName();
+      replaceColumn(
+          dataCols, fieldName, old -> copyColumn(old, newColName, 
old.dataType(), old.comment()));
+
+    } else if (change instanceof TableChange.UpdateColumnType) {
+      boolean isPartition = partCols.stream().anyMatch(c -> 
c.name().equals(fieldName));
+      Preconditions.checkArgument(
+          !isPartition, "Cannot update type of partition column: %s", 
fieldName);
+      Type newType = ((TableChange.UpdateColumnType) change).getNewDataType();
+      replaceColumn(
+          dataCols, fieldName, old -> copyColumn(old, old.name(), newType, 
old.comment()));
+
+    } else if (change instanceof TableChange.UpdateColumnComment) {
+      String newCmt = ((TableChange.UpdateColumnComment) 
change).getNewComment();
+      if (!replaceColumn(
+          dataCols, fieldName, old -> copyColumn(old, old.name(), 
old.dataType(), newCmt))) {
+        replaceColumn(
+            partCols, fieldName, old -> copyColumn(old, old.name(), 
old.dataType(), newCmt));
+      }
+
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported column change: " + change.getClass().getSimpleName());
+    }
+  }
+
+  /**
+   * Finds the first column matching {@code name} in {@code cols}, replaces it 
with the result of
+   * {@code updater}, and returns {@code true} if a replacement was made.
+   */
+  private void applyCatalogId(Consumer<String> setter) {
+    if (catalogId != null) setter.accept(catalogId);
+  }
+
+  private static boolean replaceColumn(
+      List<Column> cols, String name, UnaryOperator<Column> updater) {
+    for (int i = 0; i < cols.size(); i++) {
+      if (cols.get(i).name().equals(name)) {
+        cols.set(i, updater.apply(cols.get(i)));
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static Column copyColumn(Column src, String name, Type type, String 
comment) {
+    return GlueColumn.builder()
+        .withName(name)
+        .withType(type)
+        .withComment(comment)
+        .withNullable(src.nullable())
+        .build();
   }
 }
diff --git 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueExceptionConverter.java
 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueExceptionConverter.java
new file mode 100644
index 0000000000..7857875e0c
--- /dev/null
+++ 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueExceptionConverter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.glue;
+
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.InvalidInputException;
+
+/** Converts AWS Glue SDK exceptions to Gravitino exceptions. */
+final class GlueExceptionConverter {
+
+  private GlueExceptionConverter() {}
+
+  /**
+   * Converts a {@link GlueException} to the appropriate Gravitino schema 
exception.
+   *
+   * @param e the Glue exception to convert
+   * @param context description of the operation context for error messages
+   * @return a Gravitino or standard Java runtime exception
+   */
+  static RuntimeException toSchemaException(GlueException e, String context) {
+    if (e instanceof EntityNotFoundException) {
+      return new NoSuchSchemaException(e, "%s does not exist", context);
+    }
+    if (e instanceof AlreadyExistsException) {
+      return new SchemaAlreadyExistsException(e, "%s already exists", context);
+    }
+    if (e instanceof InvalidInputException) {
+      return new IllegalArgumentException(context + ": " + e.getMessage(), e);
+    }
+    return new RuntimeException("Glue error: " + context, e);
+  }
+
+  /**
+   * Converts a {@link GlueException} to the appropriate Gravitino table 
exception.
+   *
+   * @param e the Glue exception to convert
+   * @param context description of the operation context for error messages
+   * @return a Gravitino or standard Java runtime exception
+   */
+  static RuntimeException toTableException(GlueException e, String context) {
+    if (e instanceof EntityNotFoundException) {
+      return new NoSuchTableException(e, "%s does not exist", context);
+    }
+    if (e instanceof AlreadyExistsException) {
+      return new TableAlreadyExistsException(e, "%s already exists", context);
+    }
+    if (e instanceof InvalidInputException) {
+      return new IllegalArgumentException(context + ": " + e.getMessage(), e);
+    }
+    return new RuntimeException("Glue error: " + context, e);
+  }
+}
diff --git 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java
 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java
index 9d0fd0d1e0..d080e56eec 100644
--- 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java
+++ 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java
@@ -26,7 +26,9 @@ import static 
org.apache.gravitino.catalog.glue.GlueConstants.SERDE_NAME;
 import static 
org.apache.gravitino.catalog.glue.GlueConstants.SERDE_PARAMETER_PREFIX;
 import static org.apache.gravitino.catalog.glue.GlueConstants.TABLE_TYPE;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +39,7 @@ import org.apache.gravitino.connector.BaseTable;
 import org.apache.gravitino.connector.TableOperations;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.SupportsPartitions;
 import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
@@ -45,6 +48,7 @@ import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.sorts.SortOrders;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import software.amazon.awssdk.services.glue.GlueClient;
 import software.amazon.awssdk.services.glue.model.StorageDescriptor;
 import software.amazon.awssdk.services.glue.model.Table;
 
@@ -58,13 +62,44 @@ import software.amazon.awssdk.services.glue.model.Table;
 @ToString
 public class GlueTable extends BaseTable {
 
+  /** Glue client injected after construction to support partition operations. 
*/
+  private GlueClient glueClient;
+
+  /** Nullable — when null, Glue uses the caller's AWS account ID. */
+  private String catalogId;
+
+  /** Database (schema) this table belongs to. */
+  private String dbName;
+
   private GlueTable() {}
 
+  /**
+   * Injects the Glue connection context required for partition operations.
+   *
+   * <p>Called by {@link GlueCatalogOperations} after constructing a table 
instance.
+   */
+  void initOpsContext(GlueClient glueClient, String catalogId, String dbName) {
+    this.glueClient = glueClient;
+    this.catalogId = catalogId;
+    this.dbName = dbName;
+  }
+
   @Override
   protected TableOperations newOps() {
-    // Partition operations are deferred to PR-06.
-    throw new UnsupportedOperationException(
-        "Partition operations are not yet supported for GlueTable");
+    Preconditions.checkState(
+        glueClient != null,
+        "Partition operations require Glue context; call initOpsContext() 
first");
+    String[] partColNames =
+        Arrays.stream(partitioning)
+            .filter(t -> t instanceof Transforms.IdentityTransform)
+            .map(t -> ((Transforms.IdentityTransform) t).fieldName()[0])
+            .toArray(String[]::new);
+    return new GlueTableOperations(glueClient, catalogId, dbName, name, 
partColNames);
+  }
+
+  @Override
+  public SupportsPartitions supportPartitions() throws 
UnsupportedOperationException {
+    return (SupportsPartitions) ops();
   }
 
   /**
diff --git 
a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java
 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java
new file mode 100644
index 0000000000..4f79417376
--- /dev/null
+++ 
b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.glue;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.gravitino.connector.TableOperations;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.rel.SupportsPartitions;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.partitions.IdentityPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+
+/**
+ * Table-level partition operations for the AWS Glue Data Catalog.
+ *
+ * <p>Implements {@link SupportsPartitions} for Hive-format 
identity-partitioned tables. Partition
+ * names follow the Hive convention: {@code col=val/col2=val2}.
+ *
+ * <p>Only {@link IdentityPartition} is supported; other partition types throw 
{@link
+ * IllegalArgumentException}.
+ */
+class GlueTableOperations implements TableOperations, SupportsPartitions {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GlueTableOperations.class);
+
+  private final GlueClient glueClient;
+  /** Nullable — when null, Glue uses the caller's AWS account ID. */
+  private final String catalogId;
+
+  private final String dbName;
+  private final String tableName;
+  /** Ordered partition column names, matching the table's {@code 
partitionKeys()} order. */
+  private final String[] partitionColNames;
+
+  GlueTableOperations(
+      GlueClient glueClient,
+      String catalogId,
+      String dbName,
+      String tableName,
+      String[] partitionColNames) {
+    this.glueClient = glueClient;
+    this.catalogId = catalogId;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.partitionColNames = partitionColNames;
+  }
+
+  @Override
+  public String[] listPartitionNames() {
+    List<String> names = new ArrayList<>();
+    String nextToken = null;
+    try {
+      do {
+        GetPartitionsRequest.Builder req =
+            
GetPartitionsRequest.builder().databaseName(dbName).tableName(tableName);
+        if (catalogId != null) req.catalogId(catalogId);
+        if (nextToken != null) req.nextToken(nextToken);
+        GetPartitionsResponse resp = glueClient.getPartitions(req.build());
+        for (software.amazon.awssdk.services.glue.model.Partition p : 
resp.partitions()) {
+          names.add(buildPartitionName(p.values()));
+        }
+        nextToken = resp.nextToken();
+      } while (nextToken != null);
+    } catch (GlueException e) {
+      throw new RuntimeException("Failed to list partitions for table " + 
tableName, e);
+    }
+    return names.toArray(new String[0]);
+  }
+
+  @Override
+  public Partition[] listPartitions() {
+    List<Partition> partitions = new ArrayList<>();
+    String nextToken = null;
+    try {
+      do {
+        GetPartitionsRequest.Builder req =
+            
GetPartitionsRequest.builder().databaseName(dbName).tableName(tableName);
+        if (catalogId != null) req.catalogId(catalogId);
+        if (nextToken != null) req.nextToken(nextToken);
+        GetPartitionsResponse resp = glueClient.getPartitions(req.build());
+        for (software.amazon.awssdk.services.glue.model.Partition p : 
resp.partitions()) {
+          partitions.add(toGravitinoPartition(p));
+        }
+        nextToken = resp.nextToken();
+      } while (nextToken != null);
+    } catch (GlueException e) {
+      throw new RuntimeException("Failed to list partitions for table " + 
tableName, e);
+    }
+    return partitions.toArray(new Partition[0]);
+  }
+
+  @Override
+  public Partition getPartition(String partitionName) throws 
NoSuchPartitionException {
+    List<String> values = parsePartitionName(partitionName);
+    GetPartitionRequest.Builder req =
+        GetPartitionRequest.builder()
+            .databaseName(dbName)
+            .tableName(tableName)
+            .partitionValues(values);
+    if (catalogId != null) req.catalogId(catalogId);
+    try {
+      return 
toGravitinoPartition(glueClient.getPartition(req.build()).partition());
+    } catch (EntityNotFoundException e) {
+      throw new NoSuchPartitionException(
+          e, "Partition %s does not exist in table %s", partitionName, 
tableName);
+    } catch (GlueException e) {
+      throw new RuntimeException("Failed to get partition " + partitionName, 
e);
+    }
+  }
+
+  @Override
+  public Partition addPartition(Partition partition) throws 
PartitionAlreadyExistsException {
+    Preconditions.checkArgument(
+        partition instanceof IdentityPartition, "Glue only supports identity 
partitions");
+    IdentityPartition ip = (IdentityPartition) partition;
+
+    List<String> values = new ArrayList<>(ip.values().length);
+    for (Literal<?> v : ip.values()) {
+      values.add(v.value() != null ? v.value().toString() : null);
+    }
+
+    PartitionInput input =
+        PartitionInput.builder()
+            .values(values)
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .build();
+
+    CreatePartitionRequest.Builder req =
+        CreatePartitionRequest.builder()
+            .databaseName(dbName)
+            .tableName(tableName)
+            .partitionInput(input);
+    if (catalogId != null) req.catalogId(catalogId);
+
+    try {
+      glueClient.createPartition(req.build());
+    } catch (AlreadyExistsException e) {
+      throw new PartitionAlreadyExistsException(
+          e, "Partition %s already exists in table %s", partition.name(), 
tableName);
+    } catch (GlueException e) {
+      throw new RuntimeException("Failed to add partition " + 
partition.name(), e);
+    }
+
+    LOG.info("Added partition {} to {}.{}", partition.name(), dbName, 
tableName);
+    return Partitions.identity(
+        partition.name(), ip.fieldNames(), ip.values(), 
partition.properties());
+  }
+
+  @Override
+  public void close() {
+    // GlueClient lifecycle is managed by GlueCatalogOperations; nothing to 
close here.
+  }
+
+  @Override
+  public boolean dropPartition(String partitionName) {
+    List<String> values = parsePartitionName(partitionName);
+    DeletePartitionRequest.Builder req =
+        DeletePartitionRequest.builder()
+            .databaseName(dbName)
+            .tableName(tableName)
+            .partitionValues(values);
+    if (catalogId != null) req.catalogId(catalogId);
+    try {
+      glueClient.deletePartition(req.build());
+      LOG.info("Dropped partition {} from {}.{}", partitionName, dbName, 
tableName);
+      return true;
+    } catch (EntityNotFoundException e) {
+      return false;
+    } catch (GlueException e) {
+      throw new RuntimeException("Failed to drop partition " + partitionName, 
e);
+    }
+  }
+
+  /**
+   * Builds a Hive-style partition name (e.g. {@code 
dt=2024-01-01/country=us}) from an ordered list
+   * of Glue partition values.
+   */
+  private String buildPartitionName(List<String> values) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < partitionColNames.length && i < values.size(); i++) {
+      if (i > 0) sb.append('/');
+      sb.append(partitionColNames[i]).append('=').append(values.get(i));
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Parses a Hive-style partition name (e.g. {@code 
dt=2024-01-01/country=us}) into an ordered list
+   * of values.
+   */
+  private static List<String> parsePartitionName(String partitionName) {
+    String[] parts = partitionName.split("/");
+    List<String> values = new ArrayList<>(parts.length);
+    for (String part : parts) {
+      int eq = part.indexOf('=');
+      values.add(eq >= 0 ? part.substring(eq + 1) : part);
+    }
+    return values;
+  }
+
+  private IdentityPartition toGravitinoPartition(
+      software.amazon.awssdk.services.glue.model.Partition gluePartition) {
+    List<String> values = gluePartition.values();
+    String name = buildPartitionName(values);
+
+    String[][] fieldNames = new String[partitionColNames.length][];
+    Literal<?>[] literals = new Literal<?>[partitionColNames.length];
+    for (int i = 0; i < partitionColNames.length; i++) {
+      fieldNames[i] = new String[] {partitionColNames[i]};
+      literals[i] = Literals.stringLiteral(i < values.size() ? values.get(i) : 
null);
+    }
+    return Partitions.identity(name, fieldNames, literals, 
Collections.emptyMap());
+  }
+}
diff --git 
a/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogOperations.java
 
b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogOperations.java
new file mode 100644
index 0000000000..7a61a7684e
--- /dev/null
+++ 
b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogOperations.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.glue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+
+class TestGlueCatalogOperations {
+
+  private GlueCatalogOperations ops;
+  private GlueClient mockClient;
+
+  @BeforeEach
+  void setup() {
+    mockClient = mock(GlueClient.class);
+    ops = new GlueCatalogOperations();
+    ops.glueClient = mockClient;
+    // catalogId is null by default (caller's AWS account)
+  }
+
+  // -------------------------------------------------------------------------
+  // listSchemas
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testListSchemas_paginated() {
+    Namespace ns = Namespace.of("metalake", "catalog");
+    Database db1 = Database.builder().name("db1").build();
+    Database db2 = Database.builder().name("db2").build();
+    Database db3 = Database.builder().name("db3").build();
+    Database db4 = Database.builder().name("db4").build();
+
+    when(mockClient.getDatabases(any(GetDatabasesRequest.class)))
+        .thenReturn(
+            GetDatabasesResponse.builder().databaseList(db1, 
db2).nextToken("token1").build())
+        .thenReturn(GetDatabasesResponse.builder().databaseList(db3, 
db4).nextToken(null).build());
+
+    NameIdentifier[] result = ops.listSchemas(ns);
+
+    assertEquals(4, result.length);
+    assertEquals("db1", result[0].name());
+    assertEquals("db4", result[3].name());
+  }
+
+  @Test
+  void testListSchemas_empty() {
+    Namespace ns = Namespace.of("metalake", "catalog");
+    when(mockClient.getDatabases(any(GetDatabasesRequest.class)))
+        .thenReturn(
+            GetDatabasesResponse.builder()
+                .databaseList(Collections.emptyList())
+                .nextToken(null)
+                .build());
+
+    NameIdentifier[] result = ops.listSchemas(ns);
+
+    assertEquals(0, result.length);
+  }
+
+  // -------------------------------------------------------------------------
+  // createSchema
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testCreateSchema_success() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Map<String, String> props = Map.of("k", "v");
+
+    GlueSchema schema = ops.createSchema(ident, "my comment", props);
+
+    verify(mockClient).createDatabase(any(CreateDatabaseRequest.class));
+    assertEquals("mydb", schema.name());
+    assertEquals("my comment", schema.comment());
+    assertEquals(props, schema.properties());
+  }
+
+  @Test
+  void testCreateSchema_alreadyExists() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    when(mockClient.createDatabase(any(CreateDatabaseRequest.class)))
+        .thenThrow(AlreadyExistsException.builder().message("exists").build());
+
+    assertThrows(
+        SchemaAlreadyExistsException.class,
+        () -> ops.createSchema(ident, null, Collections.emptyMap()));
+  }
+
+  // -------------------------------------------------------------------------
+  // loadSchema
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testLoadSchema_success() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Database db =
+        Database.builder()
+            .name("mydb")
+            .description("desc")
+            .parameters(Map.of("k", "v"))
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenReturn(GetDatabaseResponse.builder().database(db).build());
+
+    GlueSchema schema = ops.loadSchema(ident);
+
+    assertEquals("mydb", schema.name());
+    assertEquals("desc", schema.comment());
+    assertEquals(Map.of("k", "v"), schema.properties());
+  }
+
+  @Test
+  void testLoadSchema_notFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "missing");
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(NoSuchSchemaException.class, () -> ops.loadSchema(ident));
+  }
+
+  // -------------------------------------------------------------------------
+  // alterSchema
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testAlterSchema_setProperty() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Database db =
+        Database.builder()
+            .name("mydb")
+            .parameters(Map.of("existing", "val"))
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenReturn(GetDatabaseResponse.builder().database(db).build());
+    when(mockClient.updateDatabase(any(UpdateDatabaseRequest.class)))
+        .thenReturn(UpdateDatabaseResponse.builder().build());
+
+    ArgumentCaptor<UpdateDatabaseRequest> captor =
+        ArgumentCaptor.forClass(UpdateDatabaseRequest.class);
+
+    GlueSchema result = ops.alterSchema(ident, 
SchemaChange.setProperty("newKey", "newVal"));
+
+    verify(mockClient).updateDatabase(captor.capture());
+    Map<String, String> sentParams = 
captor.getValue().databaseInput().parameters();
+    assertEquals("val", sentParams.get("existing"));
+    assertEquals("newVal", sentParams.get("newKey"));
+    assertEquals("newVal", result.properties().get("newKey"));
+  }
+
+  @Test
+  void testAlterSchema_removeProperty() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Database db =
+        Database.builder()
+            .name("mydb")
+            .parameters(Map.of("toRemove", "v"))
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenReturn(GetDatabaseResponse.builder().database(db).build());
+    when(mockClient.updateDatabase(any(UpdateDatabaseRequest.class)))
+        .thenReturn(UpdateDatabaseResponse.builder().build());
+
+    ArgumentCaptor<UpdateDatabaseRequest> captor =
+        ArgumentCaptor.forClass(UpdateDatabaseRequest.class);
+
+    ops.alterSchema(ident, SchemaChange.removeProperty("toRemove"));
+
+    verify(mockClient).updateDatabase(captor.capture());
+    
assertFalse(captor.getValue().databaseInput().parameters().containsKey("toRemove"));
+  }
+
+  @Test
+  void testAlterSchema_unsupportedChange() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Database db =
+        
Database.builder().name("mydb").parameters(Map.of()).createTime(Instant.now()).build();
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenReturn(GetDatabaseResponse.builder().database(db).build());
+
+    SchemaChange unsupported = mock(SchemaChange.class);
+
+    assertThrows(IllegalArgumentException.class, () -> ops.alterSchema(ident, 
unsupported));
+  }
+
+  @Test
+  void testAlterSchema_notFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "missing");
+    when(mockClient.getDatabase(any(GetDatabaseRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(
+        NoSuchSchemaException.class,
+        () -> ops.alterSchema(ident, SchemaChange.setProperty("k", "v")));
+  }
+
+  // -------------------------------------------------------------------------
+  // dropSchema
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testDropSchema_cascadeFalse_empty() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenReturn(GetTablesResponse.builder().tableList(List.of()).build());
+
+    boolean dropped = ops.dropSchema(ident, false);
+
+    verify(mockClient).deleteDatabase(any(DeleteDatabaseRequest.class));
+    assertTrue(dropped);
+  }
+
+  @Test
+  void testDropSchema_cascadeFalse_nonEmpty() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    Table t = Table.builder().name("t1").build();
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenReturn(GetTablesResponse.builder().tableList(t).build());
+
+    assertThrows(NonEmptySchemaException.class, () -> ops.dropSchema(ident, 
false));
+    verify(mockClient, 
never()).deleteDatabase(any(DeleteDatabaseRequest.class));
+  }
+
+  @Test
+  void testDropSchema_cascadeTrue() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+
+    boolean dropped = ops.dropSchema(ident, true);
+
+    verify(mockClient, never()).getTables(any(GetTablesRequest.class));
+    verify(mockClient).deleteDatabase(any(DeleteDatabaseRequest.class));
+    assertTrue(dropped);
+  }
+
+  @Test
+  void testDropSchema_notFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "missing");
+    when(mockClient.deleteDatabase(any(DeleteDatabaseRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    boolean dropped = ops.dropSchema(ident, true);
+
+    assertFalse(dropped);
+  }
+
+  @Test
+  void testDropSchema_withCatalogId() {
+    ops.catalogId = "123456789012";
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb");
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenReturn(GetTablesResponse.builder().tableList(List.of()).build());
+
+    ArgumentCaptor<DeleteDatabaseRequest> captor =
+        ArgumentCaptor.forClass(DeleteDatabaseRequest.class);
+
+    ops.dropSchema(ident, false);
+
+    verify(mockClient).deleteDatabase(captor.capture());
+    assertEquals("123456789012", captor.getValue().catalogId());
+  }
+}
diff --git 
a/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogOperationsTable.java
 
b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogOperationsTable.java
new file mode 100644
index 0000000000..c29d1e2131
--- /dev/null
+++ 
b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogOperationsTable.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.glue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+
+class TestGlueCatalogOperationsTable {
+
+  private GlueCatalogOperations ops;
+  private GlueClient mockClient;
+
+  @BeforeEach
+  void setup() {
+    mockClient = mock(GlueClient.class);
+    ops = new GlueCatalogOperations();
+    ops.glueClient = mockClient;
+  }
+
+  // -------------------------------------------------------------------------
+  // listTables
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testListTables_paginated() {
+    Namespace ns = Namespace.of("metalake", "catalog", "mydb");
+    Table t1 = Table.builder().name("t1").build();
+    Table t2 = Table.builder().name("t2").build();
+    Table t3 = Table.builder().name("t3").build();
+
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenReturn(GetTablesResponse.builder().tableList(t1, 
t2).nextToken("tok").build())
+        
.thenReturn(GetTablesResponse.builder().tableList(t3).nextToken(null).build());
+
+    NameIdentifier[] result = ops.listTables(ns);
+
+    assertEquals(3, result.length);
+    assertEquals("t1", result[0].name());
+    assertEquals("t3", result[2].name());
+  }
+
+  @Test
+  void testListTables_schemaNotFound() {
+    Namespace ns = Namespace.of("metalake", "catalog", "missing");
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(NoSuchSchemaException.class, () -> ops.listTables(ns));
+  }
+
+  @Test
+  void testListTables_formatFilter() {
+    ops.tableFormatFilter = java.util.Set.of("iceberg");
+    Namespace ns = Namespace.of("metalake", "catalog", "mydb");
+
+    Table icebergTable =
+        Table.builder()
+            .name("ice_tbl")
+            .parameters(Map.of(GlueConstants.TABLE_FORMAT, "ICEBERG"))
+            .build();
+    Table hiveTable = 
Table.builder().name("hive_tbl").parameters(Collections.emptyMap()).build();
+
+    when(mockClient.getTables(any(GetTablesRequest.class)))
+        .thenReturn(
+            GetTablesResponse.builder().tableList(icebergTable, 
hiveTable).nextToken(null).build());
+
+    NameIdentifier[] result = ops.listTables(ns);
+
+    assertEquals(1, result.length);
+    assertEquals("ice_tbl", result[0].name());
+  }
+
+  // -------------------------------------------------------------------------
+  // loadTable
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testLoadTable_success() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"mytable");
+    Table glueTable =
+        Table.builder()
+            .name("mytable")
+            .description("desc")
+            .storageDescriptor(
+                StorageDescriptor.builder()
+                    .columns(
+                        
software.amazon.awssdk.services.glue.model.Column.builder()
+                            .name("id")
+                            .type("bigint")
+                            .build())
+                    .build())
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenReturn(GetTableResponse.builder().table(glueTable).build());
+
+    GlueTable result = ops.loadTable(ident);
+
+    assertEquals("mytable", result.name());
+    assertEquals("desc", result.comment());
+    assertEquals(1, result.columns().length);
+    assertEquals("id", result.columns()[0].name());
+  }
+
+  @Test
+  void testLoadTable_notFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"missing");
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(NoSuchTableException.class, () -> ops.loadTable(ident));
+  }
+
+  // -------------------------------------------------------------------------
+  // createTable
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testCreateTable_success() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"mytable");
+    Column[] columns = {
+      
GlueColumn.builder().withName("id").withType(Types.LongType.get()).withNullable(true).build(),
+      GlueColumn.builder()
+          .withName("name")
+          .withType(Types.StringType.get())
+          .withNullable(true)
+          .build()
+    };
+
+    GlueTable result =
+        ops.createTable(
+            ident,
+            columns,
+            "my comment",
+            Map.of(GlueConstants.LOCATION, "s3://bucket/path"),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            SortOrders.NONE,
+            Indexes.EMPTY_INDEXES);
+
+    verify(mockClient).createTable(any(CreateTableRequest.class));
+    assertEquals("mytable", result.name());
+    assertEquals("my comment", result.comment());
+    assertEquals(2, result.columns().length);
+  }
+
+  @Test
+  void testCreateTable_alreadyExists() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"mytable");
+    when(mockClient.createTable(any(CreateTableRequest.class)))
+        .thenThrow(AlreadyExistsException.builder().message("exists").build());
+
+    assertThrows(
+        TableAlreadyExistsException.class,
+        () ->
+            ops.createTable(
+                ident,
+                new Column[0],
+                null,
+                Collections.emptyMap(),
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                SortOrders.NONE,
+                Indexes.EMPTY_INDEXES));
+  }
+
+  @Test
+  void testCreateTable_indexesRejected() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"mytable");
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.createTable(
+                ident,
+                new Column[0],
+                null,
+                Collections.emptyMap(),
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                SortOrders.NONE,
+                new org.apache.gravitino.rel.indexes.Index[] {
+                  mock(org.apache.gravitino.rel.indexes.Index.class)
+                }));
+  }
+
+  @Test
+  void testCreateTable_storageDescriptorProperties() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"mytable");
+    Map<String, String> props =
+        Map.of(
+            GlueConstants.LOCATION, "s3://my-bucket/path",
+            GlueConstants.INPUT_FORMAT, 
"org.apache.hadoop.mapred.TextInputFormat",
+            GlueConstants.OUTPUT_FORMAT,
+                "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
+            GlueConstants.TABLE_TYPE, "EXTERNAL_TABLE");
+
+    ArgumentCaptor<CreateTableRequest> captor = 
ArgumentCaptor.forClass(CreateTableRequest.class);
+
+    ops.createTable(
+        ident,
+        new Column[0],
+        "comment",
+        props,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        SortOrders.NONE,
+        Indexes.EMPTY_INDEXES);
+
+    verify(mockClient).createTable(captor.capture());
+    CreateTableRequest req = captor.getValue();
+    assertEquals("EXTERNAL_TABLE", req.tableInput().tableType());
+    assertEquals("s3://my-bucket/path", 
req.tableInput().storageDescriptor().location());
+    
assertFalse(req.tableInput().parameters().containsKey(GlueConstants.LOCATION));
+    
assertFalse(req.tableInput().parameters().containsKey(GlueConstants.TABLE_TYPE));
+  }
+
+  // -------------------------------------------------------------------------
+  // alterTable
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testAlterTable_renameAndComment() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"old");
+    Table glueTable =
+        Table.builder()
+            .name("old")
+            .description("old comment")
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenReturn(GetTableResponse.builder().table(glueTable).build());
+    when(mockClient.updateTable(any(UpdateTableRequest.class)))
+        .thenReturn(UpdateTableResponse.builder().build());
+
+    ArgumentCaptor<UpdateTableRequest> captor = 
ArgumentCaptor.forClass(UpdateTableRequest.class);
+
+    GlueTable result =
+        ops.alterTable(ident, TableChange.rename("new"), 
TableChange.updateComment("new comment"));
+
+    verify(mockClient).updateTable(captor.capture());
+    assertEquals("new", captor.getValue().tableInput().name());
+    assertEquals("new comment", result.comment());
+  }
+
+  @Test
+  void testAlterTable_setProperty() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"t");
+    Table glueTable =
+        Table.builder()
+            .name("t")
+            .parameters(Map.of("existing", "v1"))
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenReturn(GetTableResponse.builder().table(glueTable).build());
+    when(mockClient.updateTable(any(UpdateTableRequest.class)))
+        .thenReturn(UpdateTableResponse.builder().build());
+
+    GlueTable result = ops.alterTable(ident, TableChange.setProperty("newKey", 
"newVal"));
+
+    assertEquals("newVal", result.properties().get("newKey"));
+    assertEquals("v1", result.properties().get("existing"));
+  }
+
+  @Test
+  void testAlterTable_addColumn() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"t");
+    Table glueTable =
+        Table.builder()
+            .name("t")
+            .storageDescriptor(
+                StorageDescriptor.builder()
+                    .columns(
+                        List.of(
+                            
software.amazon.awssdk.services.glue.model.Column.builder()
+                                .name("id")
+                                .type("bigint")
+                                .build()))
+                    .build())
+            .createTime(Instant.now())
+            .build();
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenReturn(GetTableResponse.builder().table(glueTable).build());
+    when(mockClient.updateTable(any(UpdateTableRequest.class)))
+        .thenReturn(UpdateTableResponse.builder().build());
+
+    GlueTable result =
+        ops.alterTable(
+            ident, TableChange.addColumn(new String[] {"email"}, 
Types.StringType.get()));
+
+    assertEquals(2, result.columns().length);
+    assertEquals("email", result.columns()[1].name());
+  }
+
+  @Test
+  void testAlterTable_notFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"missing");
+    when(mockClient.getTable(any(GetTableRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(
+        NoSuchTableException.class, () -> ops.alterTable(ident, 
TableChange.updateComment("x")));
+  }
+
+  // -------------------------------------------------------------------------
+  // dropTable
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testDropTable_success() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"t");
+
+    boolean result = ops.dropTable(ident);
+
+    verify(mockClient).deleteTable(any(DeleteTableRequest.class));
+    assertTrue(result);
+  }
+
+  @Test
+  void testDropTable_notFound() {
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"missing");
+    when(mockClient.deleteTable(any(DeleteTableRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertFalse(ops.dropTable(ident));
+  }
+
+  @Test
+  void testDropTable_withCatalogId() {
+    ops.catalogId = "123456789012";
+    NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", 
"t");
+    ArgumentCaptor<DeleteTableRequest> captor = 
ArgumentCaptor.forClass(DeleteTableRequest.class);
+
+    ops.dropTable(ident);
+
+    verify(mockClient).deleteTable(captor.capture());
+    assertEquals("123456789012", captor.getValue().catalogId());
+    assertEquals("mydb", captor.getValue().databaseName());
+    assertEquals("t", captor.getValue().name());
+  }
+}
diff --git 
a/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueTableOperations.java
 
b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueTableOperations.java
new file mode 100644
index 0000000000..9be30bcced
--- /dev/null
+++ 
b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueTableOperations.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.glue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.partitions.IdentityPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+
+class TestGlueTableOperations {
+
+  private GlueClient mockClient;
+  private GlueTableOperations ops;
+
+  @BeforeEach
+  void setup() {
+    mockClient = mock(GlueClient.class);
+    ops =
+        new GlueTableOperations(mockClient, null, "mydb", "mytable", new 
String[] {"dt", "region"});
+  }
+
+  // -------------------------------------------------------------------------
+  // listPartitionNames
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testListPartitionNames_paginated() {
+    software.amazon.awssdk.services.glue.model.Partition p1 =
+        software.amazon.awssdk.services.glue.model.Partition.builder()
+            .values("2024-01-01", "us")
+            .build();
+    software.amazon.awssdk.services.glue.model.Partition p2 =
+        software.amazon.awssdk.services.glue.model.Partition.builder()
+            .values("2024-01-02", "eu")
+            .build();
+
+    when(mockClient.getPartitions(any(GetPartitionsRequest.class)))
+        
.thenReturn(GetPartitionsResponse.builder().partitions(p1).nextToken("tok").build())
+        
.thenReturn(GetPartitionsResponse.builder().partitions(p2).nextToken(null).build());
+
+    String[] names = ops.listPartitionNames();
+
+    assertEquals(2, names.length);
+    assertEquals("dt=2024-01-01/region=us", names[0]);
+    assertEquals("dt=2024-01-02/region=eu", names[1]);
+  }
+
+  @Test
+  void testListPartitionNames_empty() {
+    when(mockClient.getPartitions(any(GetPartitionsRequest.class)))
+        
.thenReturn(GetPartitionsResponse.builder().partitions(List.of()).nextToken(null).build());
+
+    assertEquals(0, ops.listPartitionNames().length);
+  }
+
+  // -------------------------------------------------------------------------
+  // listPartitions
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testListPartitions_success() {
+    software.amazon.awssdk.services.glue.model.Partition p =
+        software.amazon.awssdk.services.glue.model.Partition.builder()
+            .values("2024-01-01", "us")
+            .build();
+
+    when(mockClient.getPartitions(any(GetPartitionsRequest.class)))
+        
.thenReturn(GetPartitionsResponse.builder().partitions(p).nextToken(null).build());
+
+    Partition[] partitions = ops.listPartitions();
+
+    assertEquals(1, partitions.length);
+    IdentityPartition ip = (IdentityPartition) partitions[0];
+    assertEquals("dt=2024-01-01/region=us", ip.name());
+    assertEquals("2024-01-01", ip.values()[0].value());
+    assertEquals("us", ip.values()[1].value());
+  }
+
+  // -------------------------------------------------------------------------
+  // getPartition
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testGetPartition_success() {
+    software.amazon.awssdk.services.glue.model.Partition gluePartition =
+        software.amazon.awssdk.services.glue.model.Partition.builder()
+            .values("2024-01-01", "us")
+            .build();
+
+    ArgumentCaptor<GetPartitionRequest> captor = 
ArgumentCaptor.forClass(GetPartitionRequest.class);
+    when(mockClient.getPartition(any(GetPartitionRequest.class)))
+        
.thenReturn(GetPartitionResponse.builder().partition(gluePartition).build());
+
+    Partition result = ops.getPartition("dt=2024-01-01/region=us");
+
+    verify(mockClient).getPartition(captor.capture());
+    assertEquals(List.of("2024-01-01", "us"), 
captor.getValue().partitionValues());
+    assertEquals("dt=2024-01-01/region=us", result.name());
+  }
+
+  @Test
+  void testGetPartition_notFound() {
+    when(mockClient.getPartition(any(GetPartitionRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertThrows(NoSuchPartitionException.class, () -> 
ops.getPartition("dt=2024-01-01/region=us"));
+  }
+
+  // -------------------------------------------------------------------------
+  // addPartition
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testAddPartition_success() {
+    IdentityPartition partition =
+        Partitions.identity(
+            "dt=2024-01-01/region=us",
+            new String[][] {{"dt"}, {"region"}},
+            new org.apache.gravitino.rel.expressions.literals.Literal[] {
+              Literals.stringLiteral("2024-01-01"), 
Literals.stringLiteral("us")
+            },
+            java.util.Collections.emptyMap());
+
+    ArgumentCaptor<CreatePartitionRequest> captor =
+        ArgumentCaptor.forClass(CreatePartitionRequest.class);
+
+    Partition result = ops.addPartition(partition);
+
+    verify(mockClient).createPartition(captor.capture());
+    assertEquals(List.of("2024-01-01", "us"), 
captor.getValue().partitionInput().values());
+    assertEquals("dt=2024-01-01/region=us", result.name());
+  }
+
+  @Test
+  void testAddPartition_alreadyExists() {
+    IdentityPartition partition =
+        Partitions.identity(
+            "dt=2024-01-01/region=us",
+            new String[][] {{"dt"}, {"region"}},
+            new org.apache.gravitino.rel.expressions.literals.Literal[] {
+              Literals.stringLiteral("2024-01-01"), 
Literals.stringLiteral("us")
+            },
+            java.util.Collections.emptyMap());
+
+    when(mockClient.createPartition(any(CreatePartitionRequest.class)))
+        .thenThrow(AlreadyExistsException.builder().message("exists").build());
+
+    assertThrows(PartitionAlreadyExistsException.class, () -> 
ops.addPartition(partition));
+  }
+
+  @Test
+  void testAddPartition_nonIdentityRejected() {
+    Partition nonIdentity = mock(Partition.class);
+    assertThrows(IllegalArgumentException.class, () -> 
ops.addPartition(nonIdentity));
+  }
+
+  // -------------------------------------------------------------------------
+  // dropPartition
+  // -------------------------------------------------------------------------
+
+  @Test
+  void testDropPartition_success() {
+    ArgumentCaptor<DeletePartitionRequest> captor =
+        ArgumentCaptor.forClass(DeletePartitionRequest.class);
+
+    boolean result = ops.dropPartition("dt=2024-01-01/region=us");
+
+    verify(mockClient).deletePartition(captor.capture());
+    assertTrue(result);
+    assertEquals(List.of("2024-01-01", "us"), 
captor.getValue().partitionValues());
+    assertEquals("mydb", captor.getValue().databaseName());
+    assertEquals("mytable", captor.getValue().tableName());
+  }
+
+  @Test
+  void testDropPartition_notFound() {
+    when(mockClient.deletePartition(any(DeletePartitionRequest.class)))
+        .thenThrow(EntityNotFoundException.builder().message("not 
found").build());
+
+    assertFalse(ops.dropPartition("dt=2024-01-01/region=us"));
+  }
+
+  @Test
+  void testDropPartition_withCatalogId() {
+    GlueTableOperations opsWithId =
+        new GlueTableOperations(mockClient, "123456789012", "mydb", "mytable", 
new String[] {"dt"});
+    ArgumentCaptor<DeletePartitionRequest> captor =
+        ArgumentCaptor.forClass(DeletePartitionRequest.class);
+
+    opsWithId.dropPartition("dt=2024-01-01");
+
+    verify(mockClient).deletePartition(captor.capture());
+    assertEquals("123456789012", captor.getValue().catalogId());
+  }
+}

Reply via email to