diqiu50 commented on code in PR #10742:
URL: https://github.com/apache/gravitino/pull/10742#discussion_r3071255789


##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java:
##########
@@ -45,7 +47,12 @@ public static void fetchFileFromUri(
           break;
 
         case "file":
-          Files.createSymbolicLink(destFile.toPath(), new 
File(uri.getPath()).toPath());
+          // Synchronize on the canonical dest path to prevent concurrent 
threads from
+          // racing between deleteIfExists and createSymbolicLink for the same 
keytab file.
+          synchronized (destFile.getAbsolutePath().intern()) {
+            Files.deleteIfExists(destFile.toPath());
+            Files.createSymbolicLink(destFile.toPath(), new 
File(uri.getPath()).toPath());
+          }

Review Comment:
   Fixed. Replaced `String.intern()` with a `ConcurrentHashMap<String, Object>` 
lock map keyed by the normalized absolute destination path 
(`toAbsolutePath().normalize()`).



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java:
##########
@@ -45,7 +47,12 @@ public static void fetchFileFromUri(
           break;
 
         case "file":
-          Files.createSymbolicLink(destFile.toPath(), new 
File(uri.getPath()).toPath());
+          // Synchronize on the canonical dest path to prevent concurrent 
threads from
+          // racing between deleteIfExists and createSymbolicLink for the same 
keytab file.
+          synchronized (destFile.getAbsolutePath().intern()) {
+            Files.deleteIfExists(destFile.toPath());
+            Files.createSymbolicLink(destFile.toPath(), new 
File(uri.getPath()).toPath());
+          }

Review Comment:
   Fixed. The implementation now checks whether the existing symlink already 
points to the correct target (fast-path skip). When replacement is needed, it 
creates a temporary symlink in the same directory and atomically renames it to 
the destination with `Files.move(..., REPLACE_EXISTING)`, eliminating the 
missing-file window.



##########
catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/kerberos/TestFetchFileUtils.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.kerberos;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFetchFileUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestFetchFileUtils.class);
+  private static final int MAX_RETRIES = 8;
+  private static final long INITIAL_RETRY_DELAY_MS = 1000;
+
+  @TempDir File tempDir;
+
+  @Test
+  public void testLinkLocalFile() throws Exception {
+    File srcFile = new File(tempDir, "source");
+    Assertions.assertTrue(srcFile.createNewFile());
+    File destFile = new File(tempDir, "dest");
+
+    FetchFileUtils.fetchFileFromUri(srcFile.toURI().toString(), destFile, 10, 
new Configuration());
+    Assertions.assertTrue(Files.isSymbolicLink(destFile.toPath()));
+    Assertions.assertEquals(srcFile.toPath(), 
Files.readSymbolicLink(destFile.toPath()));
+  }
+
+  @Test
+  public void testConcurrentSymlinkCreation() throws Exception {
+    File srcFile = new File(tempDir, "source_concurrent");
+    Assertions.assertTrue(srcFile.createNewFile());
+    File destFile = new File(tempDir, "dest_concurrent");
+
+    int threadCount = 10;
+    CyclicBarrier barrier = new CyclicBarrier(threadCount);
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+    List<Future<?>> futures = new ArrayList<>();

Review Comment:
   Fixed. Wrapped the executor lifecycle in a `try/finally` block to ensure 
`shutdownNow()` is always called even if a future throws.



##########
catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/kerberos/TestFetchFileUtils.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.kerberos;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFetchFileUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestFetchFileUtils.class);
+  private static final int MAX_RETRIES = 8;
+  private static final long INITIAL_RETRY_DELAY_MS = 1000;
+
+  @TempDir File tempDir;
+
+  @Test
+  public void testLinkLocalFile() throws Exception {
+    File srcFile = new File(tempDir, "source");
+    Assertions.assertTrue(srcFile.createNewFile());
+    File destFile = new File(tempDir, "dest");
+
+    FetchFileUtils.fetchFileFromUri(srcFile.toURI().toString(), destFile, 10, 
new Configuration());
+    Assertions.assertTrue(Files.isSymbolicLink(destFile.toPath()));
+    Assertions.assertEquals(srcFile.toPath(), 
Files.readSymbolicLink(destFile.toPath()));
+  }
+
+  @Test
+  public void testConcurrentSymlinkCreation() throws Exception {
+    File srcFile = new File(tempDir, "source_concurrent");
+    Assertions.assertTrue(srcFile.createNewFile());
+    File destFile = new File(tempDir, "dest_concurrent");
+
+    int threadCount = 10;
+    CyclicBarrier barrier = new CyclicBarrier(threadCount);
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+    List<Future<?>> futures = new ArrayList<>();
+
+    for (int i = 0; i < threadCount; i++) {
+      futures.add(
+          executor.submit(
+              () -> {
+                try {
+                  barrier.await();
+                  FetchFileUtils.fetchFileFromUri(
+                      srcFile.toURI().toString(), destFile, 10, new 
Configuration());
+                } catch (Exception e) {
+                  throw new RuntimeException(e);
+                }
+              }));
+    }
+
+    for (Future<?> future : futures) {
+      future.get();
+    }
+    executor.shutdown();
+
+    Assertions.assertTrue(Files.isSymbolicLink(destFile.toPath()));
+    Assertions.assertEquals(srcFile.toPath(), 
Files.readSymbolicLink(destFile.toPath()));
+  }
+
+  @Test
+  public void testIdempotentSymlinkCreation() throws Exception {
+    File srcFile = new File(tempDir, "source_idempotent");
+    Assertions.assertTrue(srcFile.createNewFile());
+    File destFile = new File(tempDir, "dest_idempotent");
+
+    Configuration conf = new Configuration();
+    String uri = srcFile.toURI().toString();
+
+    FetchFileUtils.fetchFileFromUri(uri, destFile, 10, conf);
+    Assertions.assertTrue(Files.isSymbolicLink(destFile.toPath()));
+
+    // Second call to the same dest should succeed without error
+    FetchFileUtils.fetchFileFromUri(uri, destFile, 10, conf);
+    Assertions.assertTrue(Files.isSymbolicLink(destFile.toPath()));
+    Assertions.assertEquals(srcFile.toPath(), 
Files.readSymbolicLink(destFile.toPath()));
+  }
+
+  @Test
+  public void testSymlinkReplacedWithDifferentTarget() throws Exception {
+    File srcFileA = new File(tempDir, "source_a");
+    File srcFileB = new File(tempDir, "source_b");
+    Assertions.assertTrue(srcFileA.createNewFile());
+    Assertions.assertTrue(srcFileB.createNewFile());
+    File destFile = new File(tempDir, "dest_replace");
+
+    Configuration conf = new Configuration();
+
+    FetchFileUtils.fetchFileFromUri(srcFileA.toURI().toString(), destFile, 10, 
conf);
+    Assertions.assertEquals(srcFileA.toPath(), 
Files.readSymbolicLink(destFile.toPath()));
+
+    FetchFileUtils.fetchFileFromUri(srcFileB.toURI().toString(), destFile, 10, 
conf);
+    Assertions.assertEquals(srcFileB.toPath(), 
Files.readSymbolicLink(destFile.toPath()));
+  }
+
+  @Test
+  @Disabled("The network errors in CI maybe cause the test failed")
+  public void testDownloadFromHTTP() throws Exception {
+    File destFile = new File(tempDir, "dest_http");
+    String fileUrl = "https://downloads.apache.org/hadoop/common/KEYS";;
+    Configuration conf = new Configuration();

Review Comment:
   Fixed. Removed `testDownloadFromHTTP` along with the associated retry loop, 
constants, and logger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to