jackye1995 commented on code in PR #6179:
URL: https://github.com/apache/iceberg/pull/6179#discussion_r1021714066


##########
aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java:
##########
@@ -624,4 +642,153 @@ public void setConf(Configuration conf) {
   protected Map<String, String> properties() {
     return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
   }
+
+  private void updateTableTag(TableIdentifier from, TableIdentifier to) {
+    // should update tag when the rename process is successful
+    TableOperations ops = newTableOps(to);
+    TableMetadata lastMetadata = null;
+    try {
+      lastMetadata = ops.current();
+    } catch (NotFoundException e) {
+      LOG.warn(
+          "Failed to load table metadata for table: {}, continuing rename 
without re-tag", to, e);
+    }
+    Set<Tag> oldTags = Sets.newHashSet();
+    Set<Tag> newTags = Sets.newHashSet();
+    boolean skipNameValidation = awsProperties.glueCatalogSkipNameValidation();
+    if (awsProperties.s3WriteTableTagEnabled()) {
+      oldTags.add(
+          Tag.builder()
+              .key(AwsProperties.S3_TAG_ICEBERG_TABLE)
+              .value(IcebergToGlueConverter.getTableName(from, 
skipNameValidation))
+              .build());
+      newTags.add(
+          Tag.builder()
+              .key(AwsProperties.S3_TAG_ICEBERG_TABLE)
+              .value(IcebergToGlueConverter.getTableName(to, 
skipNameValidation))
+              .build());
+    }
+
+    if (awsProperties.s3WriteNamespaceTagEnabled()) {
+      oldTags.add(
+          Tag.builder()
+              .key(AwsProperties.S3_TAG_ICEBERG_NAMESPACE)
+              .value(IcebergToGlueConverter.getDatabaseName(from, 
skipNameValidation))
+              .build());
+      newTags.add(
+          Tag.builder()
+              .key(AwsProperties.S3_TAG_ICEBERG_NAMESPACE)
+              .value(IcebergToGlueConverter.getDatabaseName(to, 
skipNameValidation))
+              .build());
+    }
+
+    if (lastMetadata != null && ops.io() instanceof S3FileIO) {
+      updateTableTag((S3FileIO) ops.io(), lastMetadata, oldTags, newTags);
+    }
+  }
+
+  private void updateTableTag(
+      S3FileIO io, TableMetadata metadata, Set<Tag> oldTags, Set<Tag> newTags) 
{
+    Set<String> manifestListsToUpdate = Sets.newHashSet();
+    Set<ManifestFile> manifestsToUpdate = Sets.newHashSet();
+    for (Snapshot snapshot : metadata.snapshots()) {
+      // add all manifests to the delete set because both data and delete 
files should be removed
+      Iterables.addAll(manifestsToUpdate, snapshot.allManifests(io));
+      // add the manifest list to the delete set, if present
+      if (snapshot.manifestListLocation() != null) {
+        manifestListsToUpdate.add(snapshot.manifestListLocation());
+      }
+    }
+
+    LOG.info("Manifests to update: {}", Joiner.on(", 
").join(manifestsToUpdate));
+
+    boolean gcEnabled =
+        PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, 
GC_ENABLED_DEFAULT);
+
+    if (gcEnabled) {
+      // update data files only if we are sure this won't corrupt other tables
+      updateFilesTag(io, manifestsToUpdate, oldTags, newTags);
+    }
+
+    updateFilesTag(
+        io,
+        Iterables.transform(manifestsToUpdate, ManifestFile::path),
+        "manifest",
+        true,
+        oldTags,
+        newTags);
+    updateFilesTag(io, manifestListsToUpdate, "manifest list", true, oldTags, 
newTags);
+    updateFilesTag(
+        io,
+        Iterables.transform(metadata.previousFiles(), 
TableMetadata.MetadataLogEntry::file),
+        "previous metadata",
+        true,
+        oldTags,
+        newTags);
+    updateFileTag(io, metadata.metadataFileLocation(), "metadata", oldTags, 
newTags);
+  }
+
+  private void updateFilesTag(
+      S3FileIO io, Set<ManifestFile> allManifests, Set<Tag> oldTags, Set<Tag> 
newTags) {
+    Map<String, Boolean> updatedFiles =
+        new 
MapMaker().concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE).weakKeys().makeMap();
+
+    Tasks.foreach(allManifests)
+        .noRetry()
+        .suppressFailureWhenFinished()
+        .executeWith(ThreadPools.getWorkerPool())
+        .onFailure(
+            (item, exc) ->
+                LOG.warn("Failed to get updated files: this may cause orphaned 
data files", exc))
+        .run(
+            manifest -> {
+              if (manifest.content() == ManifestContent.DATA) {
+                List<String> pathsToUpdated = Lists.newArrayList();
+                CloseableIterable<String> filePaths = 
ManifestFiles.readPaths(manifest, io);
+                for (String rawFilePath : filePaths) {
+                  // intern the file path because the weak key map uses 
identity (==) instead of
+                  // equals
+                  String path = rawFilePath.intern();
+                  Boolean alreadyUpdated = updatedFiles.putIfAbsent(path, 
true);
+                  if (alreadyUpdated == null || !alreadyUpdated) {
+                    pathsToUpdated.add(path);
+                  }
+                }
+                updateFilesTag(io, pathsToUpdated, "data", false, oldTags, 
newTags);
+              }
+            });
+  }
+
+  private void updateFilesTag(
+      S3FileIO io,
+      Iterable<String> files,
+      String type,
+      boolean concurrent,

Review Comment:
   looks like we always use `true` for `concurrent`, can we just have that code 
path for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to