diqiu50 commented on code in PR #10829:
URL: https://github.com/apache/gravitino/pull/10829#discussion_r3158227314
##########
catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java:
##########
@@ -54,11 +145,558 @@ public void testConnection(
String comment,
Map<String, String> properties)
throws Exception {
- // TODO PR-04: call GlueClient.getDatabases() to verify connectivity
+ try {
+ GetDatabasesRequest.Builder req =
GetDatabasesRequest.builder().maxResults(1);
+ applyCatalogId(catalogId, req::catalogId);
+ glueClient.getDatabases(req.build());
+ } catch (GlueException e) {
+ throw new ConnectionFailedException(e, "Failed to connect to AWS Glue:
%s", e.getMessage());
+ }
}
@Override
public void close() throws IOException {
- // TODO PR-04: close GlueClient
+ if (glueClient != null) {
+ glueClient.close();
+ glueClient = null;
+ }
+ }
+
+ @Override
+ public NameIdentifier[] listSchemas(Namespace namespace) throws
NoSuchCatalogException {
+ List<NameIdentifier> result = new ArrayList<>();
+ String nextToken = null;
+ try {
+ do {
+ GetDatabasesRequest.Builder req = GetDatabasesRequest.builder();
+ applyCatalogId(catalogId, req::catalogId);
+ if (nextToken != null) req.nextToken(nextToken);
+ GetDatabasesResponse resp = glueClient.getDatabases(req.build());
+ resp.databaseList().stream()
+ .map(db -> NameIdentifier.of(namespace, db.name()))
+ .forEach(result::add);
+ nextToken = resp.nextToken();
+ } while (nextToken != null);
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "listing schemas under
" + namespace);
+ }
+ return result.toArray(new NameIdentifier[0]);
+ }
+
+ @Override
+ public GlueSchema createSchema(
+ NameIdentifier ident, String comment, Map<String, String> properties)
+ throws NoSuchCatalogException, SchemaAlreadyExistsException {
+
+ Map<String, String> params = properties != null ? properties :
Collections.emptyMap();
+
+ DatabaseInput input =
+
DatabaseInput.builder().name(ident.name()).description(comment).parameters(params).build();
+
+ CreateDatabaseRequest.Builder req =
CreateDatabaseRequest.builder().databaseInput(input);
+ applyCatalogId(catalogId, req::catalogId);
+
+ try {
+ glueClient.createDatabase(req.build());
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "schema " +
ident.name());
+ }
+
+ LOG.info("Created Glue schema (database) {}", ident.name());
+
+ return GlueSchema.builder()
+ .withName(ident.name())
+ .withComment(comment)
+ .withProperties(params)
+ .withAuditInfo(
+ AuditInfo.builder()
+ .withCreator(PrincipalUtils.getCurrentUserName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+ }
+
+ @Override
+ public GlueSchema loadSchema(NameIdentifier ident) throws
NoSuchSchemaException {
+ GetDatabaseRequest.Builder req =
GetDatabaseRequest.builder().name(ident.name());
+ applyCatalogId(catalogId, req::catalogId);
+ try {
+ GlueSchema schema =
+
GlueSchema.fromGlueDatabase(glueClient.getDatabase(req.build()).database());
+ LOG.info("Loaded Glue schema (database) {}", ident.name());
+ return schema;
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "schema " +
ident.name());
+ }
+ }
+
+ @Override
+ public GlueSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
+ throws NoSuchSchemaException {
+
+ GlueSchema current = loadSchema(ident);
+
+ Map<String, String> newProps = new HashMap<>(current.properties());
+
+ for (SchemaChange change : changes) {
+ if (change instanceof SchemaChange.SetProperty) {
+ SchemaChange.SetProperty sp = (SchemaChange.SetProperty) change;
+ newProps.put(sp.getProperty(), sp.getValue());
+ } else if (change instanceof SchemaChange.RemoveProperty) {
+ newProps.remove(((SchemaChange.RemoveProperty) change).getProperty());
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported schema change: " + change.getClass().getSimpleName());
+ }
+ }
+
+ DatabaseInput input =
+ DatabaseInput.builder()
+ .name(ident.name())
+ .description(current.comment())
+ .parameters(newProps)
+ .build();
+
+ UpdateDatabaseRequest.Builder req =
+
UpdateDatabaseRequest.builder().name(ident.name()).databaseInput(input);
+ applyCatalogId(catalogId, req::catalogId);
+
+ try {
+ glueClient.updateDatabase(req.build());
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "schema " +
ident.name());
+ }
+
+ LOG.info("Altered Glue schema (database) {}", ident.name());
+
+ return GlueSchema.builder()
+ .withName(ident.name())
+ .withComment(current.comment())
+ .withProperties(newProps)
+ .withAuditInfo(current.auditInfo())
+ .build();
+ }
+
+ @Override
+ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws
NonEmptySchemaException {
+ if (!cascade) {
+ GetTablesRequest.Builder tabReq =
+ GetTablesRequest.builder().databaseName(ident.name()).maxResults(1);
+ applyCatalogId(catalogId, tabReq::catalogId);
+ try {
+ if (!glueClient.getTables(tabReq.build()).tableList().isEmpty()) {
+ throw new NonEmptySchemaException(
+ "Schema %s is not empty. Use cascade=true to drop it with its
tables.", ident.name());
+ }
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(
+ e, "checking tables in schema " + ident.name());
+ }
+ }
+
+ DeleteDatabaseRequest.Builder req =
DeleteDatabaseRequest.builder().name(ident.name());
+ applyCatalogId(catalogId, req::catalogId);
+ try {
+ glueClient.deleteDatabase(req.build());
+ LOG.info("Dropped Glue schema (database) {}", ident.name());
+ return true;
+ } catch (EntityNotFoundException e) {
+ return false;
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "schema " +
ident.name());
+ }
+ }
+
+ @Override
+ public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
+ String dbName = schemaName(namespace);
+ List<NameIdentifier> result = new ArrayList<>();
+ String nextToken = null;
+ try {
+ do {
+ GetTablesRequest.Builder req =
GetTablesRequest.builder().databaseName(dbName);
+ applyCatalogId(catalogId, req::catalogId);
+ if (nextToken != null) req.nextToken(nextToken);
+ GetTablesResponse resp = glueClient.getTables(req.build());
+ resp.tableList().stream()
+ .filter(this::matchesFormatFilter)
+ .map(t -> NameIdentifier.of(namespace, t.name()))
+ .forEach(result::add);
+ nextToken = resp.nextToken();
+ } while (nextToken != null);
+ } catch (EntityNotFoundException e) {
+ throw new NoSuchSchemaException(e, "Schema %s does not exist", dbName);
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toSchemaException(e, "listing tables in
schema " + dbName);
+ }
+ return result.toArray(new NameIdentifier[0]);
+ }
+
+ @Override
+ public GlueTable loadTable(NameIdentifier ident) throws NoSuchTableException
{
+ String dbName = schemaName(ident.namespace());
+ GetTableRequest.Builder req =
GetTableRequest.builder().databaseName(dbName).name(ident.name());
+ applyCatalogId(catalogId, req::catalogId);
+ try {
+ GlueTable table =
+ GlueTable.fromGlueTable(glueClient.getTable(req.build()).table(),
typeConverter);
+ table.initOpsContext(glueClient, catalogId, dbName);
+ LOG.info("Loaded Glue table {}.{}", dbName, ident.name());
+ return table;
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toTableException(e, "table " +
ident.name());
+ }
+ }
+
+ @Override
+ public GlueTable createTable(
+ NameIdentifier ident,
+ Column[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitions,
+ Distribution distribution,
+ SortOrder[] sortOrders,
+ Index[] indexes)
+ throws NoSuchSchemaException, TableAlreadyExistsException {
+
+ Preconditions.checkArgument(indexes.length == 0, "Glue catalog does not
support indexes");
+
+ for (Transform t : partitions) {
+ Preconditions.checkArgument(
+ t instanceof Transforms.IdentityTransform,
+ "Glue catalog only supports identity partitioning, got: %s",
+ t.name());
+ Preconditions.checkArgument(
+ ((Transforms.IdentityTransform) t).fieldName().length == 1,
+ "Glue catalog does not support nested field partitioning");
+ }
+
+ String dbName = schemaName(ident.namespace());
+ Map<String, String> props = properties != null ? properties :
Collections.emptyMap();
+
+ TableInput input =
+ buildTableInput(
+ ident.name(), comment, columns, props, partitions, distribution,
sortOrders);
+
+ CreateTableRequest.Builder req =
+ CreateTableRequest.builder().databaseName(dbName).tableInput(input);
+ applyCatalogId(catalogId, req::catalogId);
+
+ try {
+ glueClient.createTable(req.build());
+ } catch (GlueException e) {
+ throw GlueExceptionConverter.toTableException(e, "table " +
ident.name());
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]