jackye1995 commented on code in PR #6179: URL: https://github.com/apache/iceberg/pull/6179#discussion_r1021714066
########## aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java: ########## @@ -624,4 +642,153 @@ public void setConf(Configuration conf) { protected Map<String, String> properties() { return catalogProperties == null ? ImmutableMap.of() : catalogProperties; } + + private void updateTableTag(TableIdentifier from, TableIdentifier to) { + // should update tag when the rename process is successful + TableOperations ops = newTableOps(to); + TableMetadata lastMetadata = null; + try { + lastMetadata = ops.current(); + } catch (NotFoundException e) { + LOG.warn( + "Failed to load table metadata for table: {}, continuing rename without re-tag", to, e); + } + Set<Tag> oldTags = Sets.newHashSet(); + Set<Tag> newTags = Sets.newHashSet(); + boolean skipNameValidation = awsProperties.glueCatalogSkipNameValidation(); + if (awsProperties.s3WriteTableTagEnabled()) { + oldTags.add( + Tag.builder() + .key(AwsProperties.S3_TAG_ICEBERG_TABLE) + .value(IcebergToGlueConverter.getTableName(from, skipNameValidation)) + .build()); + newTags.add( + Tag.builder() + .key(AwsProperties.S3_TAG_ICEBERG_TABLE) + .value(IcebergToGlueConverter.getTableName(to, skipNameValidation)) + .build()); + } + + if (awsProperties.s3WriteNamespaceTagEnabled()) { + oldTags.add( + Tag.builder() + .key(AwsProperties.S3_TAG_ICEBERG_NAMESPACE) + .value(IcebergToGlueConverter.getDatabaseName(from, skipNameValidation)) + .build()); + newTags.add( + Tag.builder() + .key(AwsProperties.S3_TAG_ICEBERG_NAMESPACE) + .value(IcebergToGlueConverter.getDatabaseName(to, skipNameValidation)) + .build()); + } + + if (lastMetadata != null && ops.io() instanceof S3FileIO) { + updateTableTag((S3FileIO) ops.io(), lastMetadata, oldTags, newTags); + } + } + + private void updateTableTag( + S3FileIO io, TableMetadata metadata, Set<Tag> oldTags, Set<Tag> newTags) { + Set<String> manifestListsToUpdate = Sets.newHashSet(); + Set<ManifestFile> manifestsToUpdate = Sets.newHashSet(); + for (Snapshot snapshot : metadata.snapshots()) { + // add all manifests to the delete set because both data and delete files should be removed + Iterables.addAll(manifestsToUpdate, snapshot.allManifests(io)); + // add the manifest list to the delete set, if present + if (snapshot.manifestListLocation() != null) { + manifestListsToUpdate.add(snapshot.manifestListLocation()); + } + } + + LOG.info("Manifests to update: {}", Joiner.on(", ").join(manifestsToUpdate)); + + boolean gcEnabled = + PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT); + + if (gcEnabled) { + // update data files only if we are sure this won't corrupt other tables + updateFilesTag(io, manifestsToUpdate, oldTags, newTags); + } + + updateFilesTag( + io, + Iterables.transform(manifestsToUpdate, ManifestFile::path), + "manifest", + true, + oldTags, + newTags); + updateFilesTag(io, manifestListsToUpdate, "manifest list", true, oldTags, newTags); + updateFilesTag( + io, + Iterables.transform(metadata.previousFiles(), TableMetadata.MetadataLogEntry::file), + "previous metadata", + true, + oldTags, + newTags); + updateFileTag(io, metadata.metadataFileLocation(), "metadata", oldTags, newTags); + } + + private void updateFilesTag( + S3FileIO io, Set<ManifestFile> allManifests, Set<Tag> oldTags, Set<Tag> newTags) { + Map<String, Boolean> updatedFiles = + new MapMaker().concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE).weakKeys().makeMap(); + + Tasks.foreach(allManifests) + .noRetry() + .suppressFailureWhenFinished() + .executeWith(ThreadPools.getWorkerPool()) + .onFailure( + (item, exc) -> + LOG.warn("Failed to get updated files: this may cause orphaned data files", exc)) + .run( + manifest -> { + if (manifest.content() == ManifestContent.DATA) { + List<String> pathsToUpdated = Lists.newArrayList(); + CloseableIterable<String> filePaths = ManifestFiles.readPaths(manifest, io); + for (String rawFilePath : filePaths) { + // intern the file path because the weak key map uses identity (==) instead of + // equals + String path = rawFilePath.intern(); + Boolean alreadyUpdated = updatedFiles.putIfAbsent(path, true); + if (alreadyUpdated == null || !alreadyUpdated) { + pathsToUpdated.add(path); + } + } + updateFilesTag(io, pathsToUpdated, "data", false, oldTags, newTags); + } + }); + } + + private void updateFilesTag( + S3FileIO io, + Iterable<String> files, + String type, + boolean concurrent, Review Comment: looks like we always use `true` for `concurrent`, can we just have that code path for now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org