yuqi1129 commented on code in PR #10829: URL: https://github.com/apache/gravitino/pull/10829#discussion_r3145416668
########## catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java: ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.glue; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.gravitino.connector.TableOperations; +import org.apache.gravitino.exceptions.NoSuchPartitionException; +import org.apache.gravitino.exceptions.PartitionAlreadyExistsException; +import org.apache.gravitino.rel.SupportsPartitions; +import org.apache.gravitino.rel.expressions.literals.Literal; +import org.apache.gravitino.rel.expressions.literals.Literals; +import org.apache.gravitino.rel.partitions.IdentityPartition; +import org.apache.gravitino.rel.partitions.Partition; +import org.apache.gravitino.rel.partitions.Partitions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.CreatePartitionRequest; +import software.amazon.awssdk.services.glue.model.DeletePartitionRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetPartitionRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.PartitionInput; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; + +/** + * Table-level partition operations for the AWS Glue Data Catalog. + * + * <p>Implements {@link SupportsPartitions} for Hive-format identity-partitioned tables. Partition + * names follow the Hive convention: {@code col=val/col2=val2}. + * + * <p>Only {@link IdentityPartition} is supported; other partition types throw {@link + * IllegalArgumentException}. + */ +class GlueTableOperations implements TableOperations, SupportsPartitions { + + private static final Logger LOG = LoggerFactory.getLogger(GlueTableOperations.class); + + private final GlueClient glueClient; + /** Nullable — when null, Glue uses the caller's AWS account ID. */ + private final String catalogId; + + private final String dbName; + private final String tableName; + /** Ordered partition column names, matching the table's {@code partitionKeys()} order. */ + private final String[] partitionColNames; + + GlueTableOperations( + GlueClient glueClient, + String catalogId, + String dbName, + String tableName, + String[] partitionColNames) { + this.glueClient = glueClient; + this.catalogId = catalogId; + this.dbName = dbName; + this.tableName = tableName; + this.partitionColNames = partitionColNames; + } + + @Override + public String[] listPartitionNames() { + List<String> names = new ArrayList<>(); + String nextToken = null; + try { + do { + GetPartitionsRequest.Builder req = + GetPartitionsRequest.builder().databaseName(dbName).tableName(tableName); + if (catalogId != null) req.catalogId(catalogId); Review Comment: applyCatalogId(req::catalogId) ########## catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java: ########## @@ -54,11 +145,560 @@ public void testConnection( String comment, Map<String, String> properties) throws Exception { - // TODO PR-04: call GlueClient.getDatabases() to verify connectivity + try { + GetDatabasesRequest.Builder req = GetDatabasesRequest.builder().maxResults(1); + applyCatalogId(req::catalogId); + glueClient.getDatabases(req.build()); + } catch (GlueException e) { + throw new ConnectionFailedException(e, "Failed to connect to AWS Glue: %s", e.getMessage()); + } } @Override public void close() throws IOException { - // TODO PR-04: close GlueClient + if (glueClient != null) { + glueClient.close(); + glueClient = null; + } + } + + @Override + public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { + List<NameIdentifier> result = new ArrayList<>(); + String nextToken = null; + try { + do { + GetDatabasesRequest.Builder req = GetDatabasesRequest.builder(); + applyCatalogId(req::catalogId); + if (nextToken != null) req.nextToken(nextToken); + GetDatabasesResponse resp = glueClient.getDatabases(req.build()); + resp.databaseList().stream() + .map(db -> NameIdentifier.of(namespace, db.name())) + .forEach(result::add); + nextToken = resp.nextToken(); + } while (nextToken != null); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "listing schemas under " + namespace); + } + return result.toArray(new NameIdentifier[0]); + } + + @Override + public GlueSchema createSchema( + NameIdentifier ident, String comment, Map<String, String> properties) + throws NoSuchCatalogException, SchemaAlreadyExistsException { + + Map<String, String> params = properties != null ? properties : Collections.emptyMap(); + + DatabaseInput input = + DatabaseInput.builder().name(ident.name()).description(comment).parameters(params).build(); + + CreateDatabaseRequest.Builder req = CreateDatabaseRequest.builder().databaseInput(input); + applyCatalogId(req::catalogId); + + try { + glueClient.createDatabase(req.build()); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + + LOG.info("Created Glue schema (database) {}", ident.name()); + + return GlueSchema.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(params) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentUserName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + } + + @Override + public GlueSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { + GetDatabaseRequest.Builder req = GetDatabaseRequest.builder().name(ident.name()); + applyCatalogId(req::catalogId); + try { + GlueSchema schema = + GlueSchema.fromGlueDatabase(glueClient.getDatabase(req.build()).database()); + LOG.info("Loaded Glue schema (database) {}", ident.name()); + return schema; + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + } + + @Override + public GlueSchema alterSchema(NameIdentifier ident, SchemaChange... changes) + throws NoSuchSchemaException { + + GlueSchema current = loadSchema(ident); + + Map<String, String> newProps = new HashMap<>(current.properties()); + + for (SchemaChange change : changes) { + if (change instanceof SchemaChange.SetProperty) { + SchemaChange.SetProperty sp = (SchemaChange.SetProperty) change; + newProps.put(sp.getProperty(), sp.getValue()); + } else if (change instanceof SchemaChange.RemoveProperty) { + newProps.remove(((SchemaChange.RemoveProperty) change).getProperty()); + } else { + throw new IllegalArgumentException( + "Unsupported schema change: " + change.getClass().getSimpleName()); + } + } + + DatabaseInput input = + DatabaseInput.builder() + .name(ident.name()) + .description(current.comment()) + .parameters(newProps) + .build(); + + UpdateDatabaseRequest.Builder req = + UpdateDatabaseRequest.builder().name(ident.name()).databaseInput(input); + applyCatalogId(req::catalogId); + + try { + glueClient.updateDatabase(req.build()); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + + LOG.info("Altered Glue schema (database) {}", ident.name()); + + return GlueSchema.builder() + .withName(ident.name()) + .withComment(current.comment()) + .withProperties(newProps) + .withAuditInfo(current.auditInfo()) + .build(); + } + + @Override + public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { + if (!cascade) { + GetTablesRequest.Builder tabReq = + GetTablesRequest.builder().databaseName(ident.name()).maxResults(1); + applyCatalogId(tabReq::catalogId); + try { + if (!glueClient.getTables(tabReq.build()).tableList().isEmpty()) { + throw new NonEmptySchemaException( + "Schema %s is not empty. Use cascade=true to drop it with its tables.", ident.name()); + } + } catch (NonEmptySchemaException e) { + throw e; Review Comment: Remove these two lines. ########## catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java: ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.glue; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.gravitino.connector.TableOperations; +import org.apache.gravitino.exceptions.NoSuchPartitionException; +import org.apache.gravitino.exceptions.PartitionAlreadyExistsException; +import org.apache.gravitino.rel.SupportsPartitions; +import org.apache.gravitino.rel.expressions.literals.Literal; +import org.apache.gravitino.rel.expressions.literals.Literals; +import org.apache.gravitino.rel.partitions.IdentityPartition; +import org.apache.gravitino.rel.partitions.Partition; +import org.apache.gravitino.rel.partitions.Partitions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.CreatePartitionRequest; +import software.amazon.awssdk.services.glue.model.DeletePartitionRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetPartitionRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.PartitionInput; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; + +/** + * Table-level partition operations for the AWS Glue Data Catalog. + * + * <p>Implements {@link SupportsPartitions} for Hive-format identity-partitioned tables. Partition + * names follow the Hive convention: {@code col=val/col2=val2}. + * + * <p>Only {@link IdentityPartition} is supported; other partition types throw {@link + * IllegalArgumentException}. + */ +class GlueTableOperations implements TableOperations, SupportsPartitions { + + private static final Logger LOG = LoggerFactory.getLogger(GlueTableOperations.class); + + private final GlueClient glueClient; + /** Nullable — when null, Glue uses the caller's AWS account ID. */ + private final String catalogId; + + private final String dbName; + private final String tableName; + /** Ordered partition column names, matching the table's {@code partitionKeys()} order. */ + private final String[] partitionColNames; + + GlueTableOperations( + GlueClient glueClient, + String catalogId, + String dbName, + String tableName, + String[] partitionColNames) { + this.glueClient = glueClient; + this.catalogId = catalogId; + this.dbName = dbName; + this.tableName = tableName; + this.partitionColNames = partitionColNames; + } + + @Override + public String[] listPartitionNames() { + List<String> names = new ArrayList<>(); + String nextToken = null; + try { + do { + GetPartitionsRequest.Builder req = + GetPartitionsRequest.builder().databaseName(dbName).tableName(tableName); + if (catalogId != null) req.catalogId(catalogId); + if (nextToken != null) req.nextToken(nextToken); + GetPartitionsResponse resp = glueClient.getPartitions(req.build()); + for (software.amazon.awssdk.services.glue.model.Partition p : resp.partitions()) { + names.add(buildPartitionName(p.values())); + } + nextToken = resp.nextToken(); + } while (nextToken != null); + } catch (GlueException e) { + throw new RuntimeException("Failed to list partitions for table " + tableName, e); + } + return names.toArray(new String[0]); + } + + @Override + public Partition[] listPartitions() { + List<Partition> partitions = new ArrayList<>(); + String nextToken = null; + try { + do { + GetPartitionsRequest.Builder req = + GetPartitionsRequest.builder().databaseName(dbName).tableName(tableName); + if (catalogId != null) req.catalogId(catalogId); + if (nextToken != null) req.nextToken(nextToken); + GetPartitionsResponse resp = glueClient.getPartitions(req.build()); + for (software.amazon.awssdk.services.glue.model.Partition p : resp.partitions()) { + partitions.add(toGravitinoPartition(p)); + } + nextToken = resp.nextToken(); + } while (nextToken != null); + } catch (GlueException e) { + throw new RuntimeException("Failed to list partitions for table " + tableName, e); + } + return partitions.toArray(new Partition[0]); + } + + @Override + public Partition getPartition(String partitionName) throws NoSuchPartitionException { + List<String> values = parsePartitionName(partitionName); + GetPartitionRequest.Builder req = + GetPartitionRequest.builder() + .databaseName(dbName) + .tableName(tableName) + .partitionValues(values); + if (catalogId != null) req.catalogId(catalogId); + try { + return toGravitinoPartition(glueClient.getPartition(req.build()).partition()); + } catch (EntityNotFoundException e) { + throw new NoSuchPartitionException( + e, "Partition %s does not exist in table %s", partitionName, tableName); + } catch (GlueException e) { + throw new RuntimeException("Failed to get partition " + partitionName, e); + } + } + + @Override + public Partition addPartition(Partition partition) throws PartitionAlreadyExistsException { + Preconditions.checkArgument( + partition instanceof IdentityPartition, "Glue only supports identity partitions"); + IdentityPartition ip = (IdentityPartition) partition; + Preconditions.checkArgument( + ip.values().length == partitionColNames.length, + "Partition values count (%s) must match partition columns count (%s)", + ip.values().length, + partitionColNames.length); + + List<String> values = new ArrayList<>(ip.values().length); + for (Literal<?> v : ip.values()) { + values.add(v.value() != null ? v.value().toString() : null); + } + + PartitionInput input = + PartitionInput.builder() + .values(values) + .storageDescriptor(StorageDescriptor.builder().build()) + .build(); + + CreatePartitionRequest.Builder req = + CreatePartitionRequest.builder() + .databaseName(dbName) + .tableName(tableName) + .partitionInput(input); + if (catalogId != null) req.catalogId(catalogId); + + try { + glueClient.createPartition(req.build()); + } catch (AlreadyExistsException e) { + throw new PartitionAlreadyExistsException( + e, "Partition %s already exists in table %s", partition.name(), tableName); + } catch (GlueException e) { + throw new RuntimeException("Failed to add partition " + partition.name(), e); + } + + LOG.info("Added partition {} to {}.{}", partition.name(), dbName, tableName); + return Partitions.identity( + partition.name(), ip.fieldNames(), ip.values(), partition.properties()); + } + + @Override + public void close() { + // GlueClient lifecycle is managed by GlueCatalogOperations; nothing to close here. + } + + @Override + public boolean dropPartition(String partitionName) { + List<String> values = parsePartitionName(partitionName); + DeletePartitionRequest.Builder req = + DeletePartitionRequest.builder() + .databaseName(dbName) + .tableName(tableName) + .partitionValues(values); + if (catalogId != null) req.catalogId(catalogId); + try { + glueClient.deletePartition(req.build()); + LOG.info("Dropped partition {} from {}.{}", partitionName, dbName, tableName); + return true; + } catch (EntityNotFoundException e) { + return false; + } catch (GlueException e) { + throw new RuntimeException("Failed to drop partition " + partitionName, e); + } + } + + /** + * Builds a Hive-style partition name (e.g. {@code dt=2024-01-01/country=us}) from an ordered list + * of Glue partition values. + */ + private String buildPartitionName(List<String> values) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partitionColNames.length && i < values.size(); i++) { + if (i > 0) sb.append('/'); + sb.append(partitionColNames[i]).append('=').append(values.get(i)); + } + return sb.toString(); + } + + /** + * Parses a Hive-style partition name (e.g. {@code dt=2024-01-01/country=us}) into an ordered list + * of values, validating that the keys match the table's partition columns in order. + */ + private List<String> parsePartitionName(String partitionName) { + String[] parts = partitionName.split("/"); + Preconditions.checkArgument( + parts.length == partitionColNames.length, + "Partition name '%s' has %s segment(s) but table has %s partition column(s)", + partitionName, + parts.length, + partitionColNames.length); + List<String> values = new ArrayList<>(parts.length); + for (int i = 0; i < parts.length; i++) { + int eq = parts[i].indexOf('='); + Preconditions.checkArgument( + eq >= 0 && parts[i].substring(0, eq).equals(partitionColNames[i]), + "Partition segment '%s' does not match expected column '%s'", + parts[i], + partitionColNames[i]); + values.add(parts[i].substring(eq + 1)); + } + return values; + } + + private IdentityPartition toGravitinoPartition( + software.amazon.awssdk.services.glue.model.Partition gluePartition) { + List<String> values = gluePartition.values(); + String name = buildPartitionName(values); + + String[][] fieldNames = new String[partitionColNames.length][]; + Literal<?>[] literals = new Literal<?>[partitionColNames.length]; + for (int i = 0; i < partitionColNames.length; i++) { + fieldNames[i] = new String[] {partitionColNames[i]}; + literals[i] = Literals.stringLiteral(i < values.size() ? values.get(i) : null); + } + return Partitions.identity(name, fieldNames, literals, Collections.emptyMap()); Review Comment: Does Glue only support the identity partitioning? ########## catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java: ########## @@ -58,12 +62,44 @@ @ToString public class GlueTable extends BaseTable { + /** Glue client injected after construction to support partition operations. */ + private GlueClient glueClient; Review Comment: Why do we need to add `glueClient` here? ########## catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java: ########## @@ -54,11 +145,560 @@ public void testConnection( String comment, Map<String, String> properties) throws Exception { - // TODO PR-04: call GlueClient.getDatabases() to verify connectivity + try { + GetDatabasesRequest.Builder req = GetDatabasesRequest.builder().maxResults(1); + applyCatalogId(req::catalogId); + glueClient.getDatabases(req.build()); + } catch (GlueException e) { + throw new ConnectionFailedException(e, "Failed to connect to AWS Glue: %s", e.getMessage()); + } } @Override public void close() throws IOException { - // TODO PR-04: close GlueClient + if (glueClient != null) { + glueClient.close(); + glueClient = null; + } + } + + @Override + public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { + List<NameIdentifier> result = new ArrayList<>(); + String nextToken = null; + try { + do { + GetDatabasesRequest.Builder req = GetDatabasesRequest.builder(); + applyCatalogId(req::catalogId); + if (nextToken != null) req.nextToken(nextToken); + GetDatabasesResponse resp = glueClient.getDatabases(req.build()); + resp.databaseList().stream() + .map(db -> NameIdentifier.of(namespace, db.name())) + .forEach(result::add); + nextToken = resp.nextToken(); + } while (nextToken != null); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "listing schemas under " + namespace); + } + return result.toArray(new NameIdentifier[0]); + } + + @Override + public GlueSchema createSchema( + NameIdentifier ident, String comment, Map<String, String> properties) + throws NoSuchCatalogException, SchemaAlreadyExistsException { + + Map<String, String> params = properties != null ? properties : Collections.emptyMap(); + + DatabaseInput input = + DatabaseInput.builder().name(ident.name()).description(comment).parameters(params).build(); + + CreateDatabaseRequest.Builder req = CreateDatabaseRequest.builder().databaseInput(input); + applyCatalogId(req::catalogId); + + try { + glueClient.createDatabase(req.build()); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + + LOG.info("Created Glue schema (database) {}", ident.name()); + + return GlueSchema.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(params) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentUserName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + } + + @Override + public GlueSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { + GetDatabaseRequest.Builder req = GetDatabaseRequest.builder().name(ident.name()); + applyCatalogId(req::catalogId); + try { + GlueSchema schema = + GlueSchema.fromGlueDatabase(glueClient.getDatabase(req.build()).database()); + LOG.info("Loaded Glue schema (database) {}", ident.name()); + return schema; + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + } + + @Override + public GlueSchema alterSchema(NameIdentifier ident, SchemaChange... changes) + throws NoSuchSchemaException { + + GlueSchema current = loadSchema(ident); + + Map<String, String> newProps = new HashMap<>(current.properties()); + + for (SchemaChange change : changes) { + if (change instanceof SchemaChange.SetProperty) { + SchemaChange.SetProperty sp = (SchemaChange.SetProperty) change; + newProps.put(sp.getProperty(), sp.getValue()); + } else if (change instanceof SchemaChange.RemoveProperty) { + newProps.remove(((SchemaChange.RemoveProperty) change).getProperty()); + } else { + throw new IllegalArgumentException( + "Unsupported schema change: " + change.getClass().getSimpleName()); + } + } + + DatabaseInput input = + DatabaseInput.builder() + .name(ident.name()) + .description(current.comment()) + .parameters(newProps) + .build(); + + UpdateDatabaseRequest.Builder req = + UpdateDatabaseRequest.builder().name(ident.name()).databaseInput(input); + applyCatalogId(req::catalogId); + + try { + glueClient.updateDatabase(req.build()); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + + LOG.info("Altered Glue schema (database) {}", ident.name()); + + return GlueSchema.builder() + .withName(ident.name()) + .withComment(current.comment()) + .withProperties(newProps) + .withAuditInfo(current.auditInfo()) + .build(); + } + + @Override + public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { + if (!cascade) { + GetTablesRequest.Builder tabReq = + GetTablesRequest.builder().databaseName(ident.name()).maxResults(1); + applyCatalogId(tabReq::catalogId); + try { + if (!glueClient.getTables(tabReq.build()).tableList().isEmpty()) { + throw new NonEmptySchemaException( + "Schema %s is not empty. Use cascade=true to drop it with its tables.", ident.name()); + } + } catch (NonEmptySchemaException e) { + throw e; + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException( + e, "checking tables in schema " + ident.name()); + } + } + + DeleteDatabaseRequest.Builder req = DeleteDatabaseRequest.builder().name(ident.name()); + applyCatalogId(req::catalogId); + try { + glueClient.deleteDatabase(req.build()); + LOG.info("Dropped Glue schema (database) {}", ident.name()); + return true; + } catch (EntityNotFoundException e) { + return false; + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + } + + @Override + public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { + String dbName = schemaName(namespace); + List<NameIdentifier> result = new ArrayList<>(); + String nextToken = null; + try { + do { + GetTablesRequest.Builder req = GetTablesRequest.builder().databaseName(dbName); + applyCatalogId(req::catalogId); + if (nextToken != null) req.nextToken(nextToken); + GetTablesResponse resp = glueClient.getTables(req.build()); + resp.tableList().stream() + .filter(this::matchesFormatFilter) + .map(t -> NameIdentifier.of(namespace, t.name())) + .forEach(result::add); + nextToken = resp.nextToken(); + } while (nextToken != null); + } catch (EntityNotFoundException e) { + throw new NoSuchSchemaException(e, "Schema %s does not exist", dbName); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "listing tables in schema " + dbName); + } + return result.toArray(new NameIdentifier[0]); + } + + @Override + public GlueTable loadTable(NameIdentifier ident) throws NoSuchTableException { + String dbName = schemaName(ident.namespace()); + GetTableRequest.Builder req = GetTableRequest.builder().databaseName(dbName).name(ident.name()); + applyCatalogId(req::catalogId); + try { + GlueTable table = + GlueTable.fromGlueTable(glueClient.getTable(req.build()).table(), typeConverter); + table.initOpsContext(glueClient, catalogId, dbName); + LOG.info("Loaded Glue table {}.{}", dbName, ident.name()); + return table; + } catch (GlueException e) { + throw GlueExceptionConverter.toTableException(e, "table " + ident.name()); + } + } + + @Override + public GlueTable createTable( + NameIdentifier ident, + Column[] columns, + String comment, + Map<String, String> properties, + Transform[] partitions, + Distribution distribution, + SortOrder[] sortOrders, + Index[] indexes) + throws NoSuchSchemaException, TableAlreadyExistsException { + + Preconditions.checkArgument(indexes.length == 0, "Glue catalog does not support indexes"); + + for (Transform t : partitions) { + Preconditions.checkArgument( + t instanceof Transforms.IdentityTransform, + "Glue catalog only supports identity partitioning, got: %s", + t.name()); + Preconditions.checkArgument( + ((Transforms.IdentityTransform) t).fieldName().length == 1, + "Glue catalog does not support nested field partitioning"); + } + + String dbName = schemaName(ident.namespace()); + Map<String, String> props = properties != null ? properties : Collections.emptyMap(); + + TableInput input = + buildTableInput( + ident.name(), comment, columns, props, partitions, distribution, sortOrders); + + CreateTableRequest.Builder req = + CreateTableRequest.builder().databaseName(dbName).tableInput(input); + applyCatalogId(req::catalogId); + + try { + glueClient.createTable(req.build()); + } catch (GlueException e) { + throw GlueExceptionConverter.toTableException(e, "table " + ident.name()); + } + + LOG.info("Created Glue table {}.{}", dbName, ident.name()); + + GlueTable created = + GlueTable.builder() + .withName(ident.name()) + .withComment(comment) + .withColumns(columns) + .withProperties(props) + .withPartitioning(partitions) + .withDistribution(distribution != null ? distribution : Distributions.NONE) + .withSortOrders(sortOrders != null ? sortOrders : new SortOrder[0]) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentUserName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + created.initOpsContext(glueClient, catalogId, dbName); + return created; + } + + @Override + public GlueTable alterTable(NameIdentifier ident, TableChange... changes) + throws NoSuchTableException, IllegalArgumentException { + + GlueTable current = loadTable(ident); + String dbName = schemaName(ident.namespace()); + + String newName = current.name(); + String newComment = current.comment(); + Map<String, String> newProps = new HashMap<>(current.properties()); + + // Separate data columns from partition columns + int partCount = current.partitioning().length; + List<Column> allCols = new ArrayList<>(Arrays.asList(current.columns())); + List<Column> dataCols = new ArrayList<>(allCols.subList(0, allCols.size() - partCount)); + List<Column> partCols = + new ArrayList<>(allCols.subList(allCols.size() - partCount, allCols.size())); + + for (TableChange change : changes) { + if (change instanceof TableChange.RenameTable) { + newName = ((TableChange.RenameTable) change).getNewName(); + } else if (change instanceof TableChange.UpdateComment) { + newComment = ((TableChange.UpdateComment) change).getNewComment(); + } else if (change instanceof TableChange.SetProperty) { + TableChange.SetProperty sp = (TableChange.SetProperty) change; + newProps.put(sp.getProperty(), sp.getValue()); + } else if (change instanceof TableChange.RemoveProperty) { + newProps.remove(((TableChange.RemoveProperty) change).getProperty()); + } else if (change instanceof TableChange.ColumnChange) { + applyColumnChange(dataCols, partCols, (TableChange.ColumnChange) change); + } else { + throw new IllegalArgumentException( + "Unsupported table change: " + change.getClass().getSimpleName()); + } + } + + List<Column> newAllCols = new ArrayList<>(dataCols); + newAllCols.addAll(partCols); + Column[] newColumns = newAllCols.toArray(new Column[0]); + + TableInput input = + buildTableInput( + newName, + newComment, + newColumns, + newProps, + current.partitioning(), + current.distribution(), + current.sortOrder()); + + UpdateTableRequest.Builder req = + UpdateTableRequest.builder().databaseName(dbName).tableInput(input); + applyCatalogId(req::catalogId); + + try { + glueClient.updateTable(req.build()); + } catch (GlueException e) { + throw GlueExceptionConverter.toTableException(e, "table " + ident.name()); + } + + LOG.info("Altered Glue table {}.{}", dbName, ident.name()); + + GlueTable altered = + GlueTable.builder() + .withName(newName) + .withComment(newComment) + .withColumns(newColumns) + .withProperties(newProps) + .withPartitioning(current.partitioning()) + .withDistribution(current.distribution()) + .withSortOrders(current.sortOrder()) + .withAuditInfo(current.auditInfo()) + .build(); + altered.initOpsContext(glueClient, catalogId, dbName); + return altered; + } + + @Override + public boolean dropTable(NameIdentifier ident) { + String dbName = schemaName(ident.namespace()); + DeleteTableRequest.Builder req = + DeleteTableRequest.builder().databaseName(dbName).name(ident.name()); + applyCatalogId(req::catalogId); + try { + glueClient.deleteTable(req.build()); + LOG.info("Dropped Glue table {}.{}", dbName, ident.name()); + return true; + } catch (EntityNotFoundException e) { + return false; + } catch (GlueException e) { + throw GlueExceptionConverter.toTableException(e, "table " + ident.name()); + } + } + + private static String schemaName(Namespace namespace) { + String[] levels = namespace.levels(); + Preconditions.checkArgument( + levels.length >= 2, "Namespace must have at least 2 levels, got: %s", levels.length); + return levels[levels.length - 1]; + } + + // NOTE: parameter type is the Glue SDK Table, not GlueTable (our domain class). + // The Glue SDK's Column model is also referenced by FQN throughout this class because its + // simple name conflicts with the imported org.apache.gravitino.rel.Column. + private boolean matchesFormatFilter(Table table) { + if (tableFormatFilter == null) return true; + String fmt = table.hasParameters() ? table.parameters().get(GlueConstants.TABLE_FORMAT) : null; + String normalized = + fmt != null ? fmt.toLowerCase(Locale.ROOT) : GlueConstants.DEFAULT_TABLE_FORMAT_VALUE; + return tableFormatFilter.contains(normalized); + } + + private TableInput buildTableInput( + String name, + String comment, + Column[] columns, + Map<String, String> properties, + Transform[] partitions, + Distribution distribution, + SortOrder[] sortOrders) { + + int partCount = partitions.length; + Preconditions.checkArgument( + columns.length >= partCount, + "columns.length (%s) must be >= number of partition columns (%s)", + columns.length, + partCount); + int dataCount = columns.length - partCount; + + List<software.amazon.awssdk.services.glue.model.Column> glueDataCols = new ArrayList<>(); + for (int i = 0; i < dataCount; i++) { + glueDataCols.add(toGlueColumn(columns[i])); + } + + List<software.amazon.awssdk.services.glue.model.Column> gluePartCols = new ArrayList<>(); + for (int i = dataCount; i < columns.length; i++) { + gluePartCols.add(toGlueColumn(columns[i])); + } + + // Separate properties into: SD fields, table-level fields, and Table.parameters() + Map<String, String> serdeParams = new HashMap<>(); + Map<String, String> tableParams = new HashMap<>(); + for (Map.Entry<String, String> entry : properties.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(GlueConstants.SERDE_PARAMETER_PREFIX)) { + serdeParams.put( + key.substring(GlueConstants.SERDE_PARAMETER_PREFIX.length()), entry.getValue()); + } else if (!SD_TABLE_PROPERTY_KEYS.contains(key) && !TABLE_LEVEL_KEYS.contains(key)) { + tableParams.put(key, entry.getValue()); + } + } + + SerDeInfo serDe = + SerDeInfo.builder() + .serializationLibrary(properties.get(GlueConstants.SERDE_LIB)) + .name(properties.get(GlueConstants.SERDE_NAME)) + .parameters(serdeParams) + .build(); + + List<String> bucketCols = Collections.emptyList(); + int numBuckets = 0; + if (distribution != null && distribution != Distributions.NONE && distribution.number() > 0) { Review Comment: When will `distribution.number()` be less than or equal to 0 if it's not `Distributions.NONE`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
