diqiu50 commented on code in PR #10829:
URL: https://github.com/apache/gravitino/pull/10829#discussion_r3158299682
##########
catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java:
##########
@@ -54,11 +145,558 @@ public void testConnection(
String comment,
Map<String, String> properties)
throws Exception {
- // TODO PR-04: call GlueClient.getDatabases() to verify connectivity
+ try {
+ GetDatabasesRequest.Builder req =
GetDatabasesRequest.builder().maxResults(1);
+ applyCatalogId(catalogId, req::catalogId);
+ glueClient.getDatabases(req.build());
+ } catch (GlueException e) {
+ throw new ConnectionFailedException(e, "Failed to connect to AWS Glue:
%s", e.getMessage());
+ }
}
@Override
public void close() throws IOException {
- // TODO PR-04: close GlueClient
+ if (glueClient != null) {
+ glueClient.close();
+ glueClient = null;
+ }
+ }
+
+ @Override
+ public NameIdentifier[] listSchemas(Namespace namespace) throws
NoSuchCatalogException {
+ List<NameIdentifier> result = new ArrayList<>();
+ String nextToken = null;
+ try {
+ do {
+ GetDatabasesRequest.Builder req = GetDatabasesRequest.builder();
+ applyCatalogId(catalogId, req::catalogId);
+ if (nextToken != null) req.nextToken(nextToken);
+ GetDatabasesResponse resp = glueClient.getDatabases(req.build());
+ resp.databaseList().stream()
+ .map(db -> NameIdentifier.of(namespace, db.name()))
+ .forEach(result::add);
+ nextToken = resp.nextToken();
+ } while (nextToken != null);
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "listing schemas under
" + namespace);
+ }
+ return result.toArray(new NameIdentifier[0]);
+ }
+
+ @Override
+ public GlueSchema createSchema(
+ NameIdentifier ident, String comment, Map<String, String> properties)
+ throws NoSuchCatalogException, SchemaAlreadyExistsException {
+
+ Map<String, String> params = properties != null ? properties :
Collections.emptyMap();
+
+ DatabaseInput input =
+
DatabaseInput.builder().name(ident.name()).description(comment).parameters(params).build();
+
+ CreateDatabaseRequest.Builder req =
CreateDatabaseRequest.builder().databaseInput(input);
+ applyCatalogId(catalogId, req::catalogId);
+
+ try {
+ glueClient.createDatabase(req.build());
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "schema " +
ident.name());
+ }
+
+ LOG.info("Created Glue schema (database) {}", ident.name());
+
+ return GlueSchema.builder()
+ .withName(ident.name())
+ .withComment(comment)
+ .withProperties(params)
+ .withAuditInfo(
+ AuditInfo.builder()
+ .withCreator(PrincipalUtils.getCurrentUserName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+ }
+
+ @Override
+ public GlueSchema loadSchema(NameIdentifier ident) throws
NoSuchSchemaException {
+ GetDatabaseRequest.Builder req =
GetDatabaseRequest.builder().name(ident.name());
+ applyCatalogId(catalogId, req::catalogId);
+ try {
+ GlueSchema schema =
+
GlueSchema.fromGlueDatabase(glueClient.getDatabase(req.build()).database());
+ LOG.info("Loaded Glue schema (database) {}", ident.name());
+ return schema;
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "schema " +
ident.name());
+ }
+ }
+
+ @Override
+ public GlueSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
+ throws NoSuchSchemaException {
+
+ GlueSchema current = loadSchema(ident);
+
+ Map<String, String> newProps = new HashMap<>(current.properties());
+
+ for (SchemaChange change : changes) {
+ if (change instanceof SchemaChange.SetProperty) {
+ SchemaChange.SetProperty sp = (SchemaChange.SetProperty) change;
+ newProps.put(sp.getProperty(), sp.getValue());
+ } else if (change instanceof SchemaChange.RemoveProperty) {
+ newProps.remove(((SchemaChange.RemoveProperty) change).getProperty());
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported schema change: " + change.getClass().getSimpleName());
+ }
+ }
+
+ DatabaseInput input =
+ DatabaseInput.builder()
+ .name(ident.name())
+ .description(current.comment())
+ .parameters(newProps)
+ .build();
+
+ UpdateDatabaseRequest.Builder req =
+
UpdateDatabaseRequest.builder().name(ident.name()).databaseInput(input);
+ applyCatalogId(catalogId, req::catalogId);
+
+ try {
+ glueClient.updateDatabase(req.build());
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "schema " +
ident.name());
+ }
+
+ LOG.info("Altered Glue schema (database) {}", ident.name());
+
+ return GlueSchema.builder()
+ .withName(ident.name())
+ .withComment(current.comment())
+ .withProperties(newProps)
+ .withAuditInfo(current.auditInfo())
+ .build();
+ }
+
+ @Override
+ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws
NonEmptySchemaException {
+ if (!cascade) {
+ GetTablesRequest.Builder tabReq =
+ GetTablesRequest.builder().databaseName(ident.name()).maxResults(1);
+ applyCatalogId(catalogId, tabReq::catalogId);
+ try {
+ if (!glueClient.getTables(tabReq.build()).tableList().isEmpty()) {
+ throw new NonEmptySchemaException(
+ "Schema %s is not empty. Use cascade=true to drop it with its
tables.", ident.name());
+ }
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(
+ e, "checking tables in schema " + ident.name());
+ }
+ }
+
+ DeleteDatabaseRequest.Builder req =
DeleteDatabaseRequest.builder().name(ident.name());
+ applyCatalogId(catalogId, req::catalogId);
+ try {
+ glueClient.deleteDatabase(req.build());
+ LOG.info("Dropped Glue schema (database) {}", ident.name());
+ return true;
+ } catch (EntityNotFoundException e) {
+ return false;
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "schema " +
ident.name());
+ }
+ }
+
+ @Override
+ public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
+ String dbName = schemaName(namespace);
+ List<NameIdentifier> result = new ArrayList<>();
+ String nextToken = null;
+ try {
+ do {
+ GetTablesRequest.Builder req =
GetTablesRequest.builder().databaseName(dbName);
+ applyCatalogId(catalogId, req::catalogId);
+ if (nextToken != null) req.nextToken(nextToken);
+ GetTablesResponse resp = glueClient.getTables(req.build());
+ resp.tableList().stream()
+ .filter(this::matchesFormatFilter)
+ .map(t -> NameIdentifier.of(namespace, t.name()))
+ .forEach(result::add);
+ nextToken = resp.nextToken();
+ } while (nextToken != null);
+ } catch (EntityNotFoundException e) {
+ throw new NoSuchSchemaException(e, "Schema %s does not exist", dbName);
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "listing tables in
schema " + dbName);
+ }
+ return result.toArray(new NameIdentifier[0]);
+ }
+
+ @Override
+ public GlueTable loadTable(NameIdentifier ident) throws NoSuchTableException
{
+ String dbName = schemaName(ident.namespace());
+ GetTableRequest.Builder req =
GetTableRequest.builder().databaseName(dbName).name(ident.name());
+ applyCatalogId(catalogId, req::catalogId);
+ try {
+ GlueTable table =
+ GlueTable.fromGlueTable(glueClient.getTable(req.build()).table(),
typeConverter);
+ table.initOpsContext(glueClient, catalogId, dbName);
+ LOG.info("Loaded Glue table {}.{}", dbName, ident.name());
+ return table;
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toTableException(e, "table " +
ident.name());
+ }
+ }
+
+ @Override
+ public GlueTable createTable(
+ NameIdentifier ident,
+ Column[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitions,
+ Distribution distribution,
+ SortOrder[] sortOrders,
+ Index[] indexes)
+ throws NoSuchSchemaException, TableAlreadyExistsException {
+
+ Preconditions.checkArgument(indexes.length == 0, "Glue catalog does not
support indexes");
+
+ for (Transform t : partitions) {
+ Preconditions.checkArgument(
+ t instanceof Transforms.IdentityTransform,
+ "Glue catalog only supports identity partitioning, got: %s",
+ t.name());
+ Preconditions.checkArgument(
+ ((Transforms.IdentityTransform) t).fieldName().length == 1,
+ "Glue catalog does not support nested field partitioning");
+ }
+
+ String dbName = schemaName(ident.namespace());
+ Map<String, String> props = properties != null ? properties :
Collections.emptyMap();
+
+ TableInput input =
+ buildTableInput(
+ ident.name(), comment, columns, props, partitions, distribution,
sortOrders);
+
+ CreateTableRequest.Builder req =
+ CreateTableRequest.builder().databaseName(dbName).tableInput(input);
+ applyCatalogId(catalogId, req::catalogId);
+
+ try {
+ glueClient.createTable(req.build());
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toTableException(e, "table " +
ident.name());
+ }
+
+ LOG.info("Created Glue table {}.{}", dbName, ident.name());
+
+ GlueTable created =
+ GlueTable.builder()
+ .withName(ident.name())
+ .withComment(comment)
+ .withColumns(columns)
+ .withProperties(props)
+ .withPartitioning(partitions)
+ .withDistribution(distribution != null ? distribution :
Distributions.NONE)
+ .withSortOrders(sortOrders != null ? sortOrders : new SortOrder[0])
+ .withAuditInfo(
+ AuditInfo.builder()
+ .withCreator(PrincipalUtils.getCurrentUserName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+ created.initOpsContext(glueClient, catalogId, dbName);
+ return created;
+ }
+
+ @Override
+ public GlueTable alterTable(NameIdentifier ident, TableChange... changes)
+ throws NoSuchTableException, IllegalArgumentException {
+
+ GlueTable current = loadTable(ident);
+ String dbName = schemaName(ident.namespace());
+
+ String newName = current.name();
+ String newComment = current.comment();
+ Map<String, String> newProps = new HashMap<>(current.properties());
+
+ // Separate data columns from partition columns
+ int partCount = current.partitioning().length;
+ List<Column> allCols = new ArrayList<>(Arrays.asList(current.columns()));
+ List<Column> dataCols = new ArrayList<>(allCols.subList(0, allCols.size()
- partCount));
+ List<Column> partCols =
+ new ArrayList<>(allCols.subList(allCols.size() - partCount,
allCols.size()));
+
+ for (TableChange change : changes) {
+ if (change instanceof TableChange.RenameTable) {
+ newName = ((TableChange.RenameTable) change).getNewName();
+ } else if (change instanceof TableChange.UpdateComment) {
+ newComment = ((TableChange.UpdateComment) change).getNewComment();
+ } else if (change instanceof TableChange.SetProperty) {
+ TableChange.SetProperty sp = (TableChange.SetProperty) change;
+ newProps.put(sp.getProperty(), sp.getValue());
+ } else if (change instanceof TableChange.RemoveProperty) {
+ newProps.remove(((TableChange.RemoveProperty) change).getProperty());
+ } else if (change instanceof TableChange.ColumnChange) {
+ applyColumnChange(dataCols, partCols, (TableChange.ColumnChange)
change);
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported table change: " + change.getClass().getSimpleName());
+ }
+ }
+
+ List<Column> newAllCols = new ArrayList<>(dataCols);
+ newAllCols.addAll(partCols);
+ Column[] newColumns = newAllCols.toArray(new Column[0]);
+
+ TableInput input =
+ buildTableInput(
+ newName,
+ newComment,
+ newColumns,
+ newProps,
+ current.partitioning(),
+ current.distribution(),
+ current.sortOrder());
+
+ UpdateTableRequest.Builder req =
+ UpdateTableRequest.builder().databaseName(dbName).tableInput(input);
+ applyCatalogId(catalogId, req::catalogId);
+
+ try {
+ glueClient.updateTable(req.build());
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toTableException(e, "table " +
ident.name());
+ }
+
+ LOG.info("Altered Glue table {}.{}", dbName, ident.name());
+
+ GlueTable altered =
+ GlueTable.builder()
+ .withName(newName)
+ .withComment(newComment)
+ .withColumns(newColumns)
+ .withProperties(newProps)
+ .withPartitioning(current.partitioning())
+ .withDistribution(current.distribution())
+ .withSortOrders(current.sortOrder())
+ .withAuditInfo(current.auditInfo())
+ .build();
+ altered.initOpsContext(glueClient, catalogId, dbName);
+ return altered;
+ }
+
+ @Override
+ public boolean dropTable(NameIdentifier ident) {
+ String dbName = schemaName(ident.namespace());
+ DeleteTableRequest.Builder req =
+ DeleteTableRequest.builder().databaseName(dbName).name(ident.name());
+ applyCatalogId(catalogId, req::catalogId);
+ try {
+ glueClient.deleteTable(req.build());
+ LOG.info("Dropped Glue table {}.{}", dbName, ident.name());
+ return true;
+ } catch (EntityNotFoundException e) {
+ return false;
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toTableException(e, "table " +
ident.name());
+ }
+ }
+
+ private static String schemaName(Namespace namespace) {
+ String[] levels = namespace.levels();
+ Preconditions.checkArgument(
+ levels.length >= 2, "Namespace must have at least 2 levels, got: %s",
levels.length);
+ return levels[levels.length - 1];
+ }
+
+ // NOTE: parameter type is the Glue SDK Table, not GlueTable (our domain
class).
+ // The Glue SDK's Column model is also referenced by FQN throughout this
class because its
+ // simple name conflicts with the imported org.apache.gravitino.rel.Column.
+ private boolean matchesFormatFilter(Table table) {
+ if (tableFormatFilter == null) return true;
+ String fmt = table.hasParameters() ?
table.parameters().get(GlueConstants.TABLE_FORMAT) : null;
+ String normalized =
+ fmt != null ? fmt.toLowerCase(Locale.ROOT) :
GlueConstants.DEFAULT_TABLE_FORMAT_VALUE;
+ return tableFormatFilter.contains(normalized);
+ }
+
+ private TableInput buildTableInput(
+ String name,
+ String comment,
+ Column[] columns,
+ Map<String, String> properties,
+ Transform[] partitions,
+ Distribution distribution,
+ SortOrder[] sortOrders) {
+
+ int partCount = partitions.length;
+ Preconditions.checkArgument(
+ columns.length >= partCount,
+ "columns.length (%s) must be >= number of partition columns (%s)",
+ columns.length,
+ partCount);
+ int dataCount = columns.length - partCount;
Review Comment:
This case will cause the Glue API to throw an error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]