This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 873a6fb0a82ce6dfd5d4b4d9e9394aaeb787f9d0
Author: jackhalfalltrades <chandrakanthperave...@gmail.com>
AuthorDate: Wed Jun 25 23:05:58 2025 -0500

    ATLAS-5061: Fix delay in startup by making the import startup asynchronous 
(#390)
    
    (cherry picked from commit ebf4253c5973e0a76fe4ccfd9601905a20c79b6b)
---
 .../apache/atlas/security/SecurityProperties.java  |   2 +-
 .../atlas/notification/ImportTaskListenerImpl.java | 255 ++++++++++-----------
 .../atlas/web/service/SecureEmbeddedServer.java    |   6 +-
 .../notification/ImportTaskListenerImplTest.java   |  49 ++++
 4 files changed, 168 insertions(+), 144 deletions(-)

diff --git 
a/intg/src/main/java/org/apache/atlas/security/SecurityProperties.java 
b/intg/src/main/java/org/apache/atlas/security/SecurityProperties.java
index 5bfbe8915..912434c04 100644
--- a/intg/src/main/java/org/apache/atlas/security/SecurityProperties.java
+++ b/intg/src/main/java/org/apache/atlas/security/SecurityProperties.java
@@ -45,7 +45,7 @@ public final class SecurityProperties {
     public static final String       ATLAS_SSL_EXCLUDE_PROTOCOLS              
= "atlas.ssl.exclude.protocols";
     public static final String       ATLAS_SSL_ENABLED_PROTOCOLS              
= "atlas.ssl.enabled.protocols";
     public static final String[]     DEFAULT_EXCLUDE_PROTOCOLS                
= new String[] {"TLSv1", "TLSv1.1"};
-    public static final String[]     ATLAS_SSL_DEFAULT_PROTOCOL               
= new String[] { "TLSv1.2" };
+    public static final String[]     ATLAS_SSL_DEFAULT_PROTOCOL               
= new String[] {"TLSv1.2"};
 
     private SecurityProperties() {
     }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
 
b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
index befa79751..c7489debe 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import static org.apache.atlas.AtlasConfiguration.ASYNC_IMPORT_TOPIC_PREFIX;
 import static org.apache.atlas.AtlasErrorCode.IMPORT_QUEUEING_FAILED;
@@ -95,26 +96,34 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
         startInternal();
     }
 
-    private void startInternal() {
-        CompletableFuture<Void> populateTask = 
CompletableFuture.runAsync(this::populateRequestQueue)
-                .exceptionally(ex -> {
-                    LOG.error("Failed to populate request queue", ex);
-                    return null;
-                });
-
-        CompletableFuture<Void> resumeTask = 
CompletableFuture.runAsync(this::resumeInProgressImports)
-                .exceptionally(ex -> {
-                    LOG.error("Failed to resume in-progress imports", ex);
-                    return null;
-                });
-
-        // Wait for both tasks to complete before proceeding
-        CompletableFuture.allOf(populateTask, resumeTask)
-                .thenRun(this::startNextImportInQueue)
-                .exceptionally(ex -> {
-                    LOG.error("Failed to start next import in queue", ex);
-                    return null;
-                }).join();
+    @Override
+    public void stop() throws AtlasException {
+        try {
+            stopImport();
+        } finally {
+            releaseAsyncImportSemaphore();
+        }
+    }
+
+    @Override
+    public void instanceIsActive() {
+        LOG.info("Reacting to active state: initializing Kafka consumers");
+
+        startInternal();
+    }
+
+    @Override
+    public void instanceIsPassive() {
+        try {
+            stopImport();
+        } finally {
+            releaseAsyncImportSemaphore();
+        }
+    }
+
+    @Override
+    public int getHandlerOrder() {
+        return 
ActiveStateChangeHandler.HandlerOrder.IMPORT_TASK_LISTENER.getOrder();
     }
 
     @Override
@@ -153,7 +162,50 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
         }
     }
 
-    private void startNextImportInQueue() {
+    @PreDestroy
+    public void stopImport() {
+        LOG.info("Shutting down import processor...");
+
+        executorService.shutdown(); // Initiate an orderly shutdown
+
+        try {
+            if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
+                LOG.warn("Executor service did not terminate gracefully within 
the timeout. Waiting longer...");
+
+                // Retry shutdown before forcing it
+                if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+                    LOG.warn("Forcing shutdown...");
+
+                    executorService.shutdownNow();
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            LOG.error("Shutdown interrupted. Forcing shutdown...");
+
+            executorService.shutdownNow();
+        }
+
+        LOG.info("Import processor stopped.");
+    }
+
+    @VisibleForTesting
+    void startInternal() {
+        populateRequestQueue();
+
+        if (!requestQueue.isEmpty()) {
+            CompletableFuture.runAsync(this::startNextImportInQueue)
+                    .exceptionally(ex -> {
+                        LOG.error("Failed to start next import in queue", ex);
+
+                        return null;
+                    });
+        }
+    }
+
+    @VisibleForTesting
+    void startNextImportInQueue() {
         LOG.info("==> startNextImportInQueue()");
 
         startAsyncImportIfAvailable(null);
@@ -176,6 +228,7 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
 
             if (isNotValidImportRequest(nextImport)) {
                 releaseAsyncImportSemaphore();
+
                 return;
             }
 
@@ -189,36 +242,12 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
         }
     }
 
-    private void startImportConsumer(AtlasAsyncImportRequest importRequest) {
-        try {
-            LOG.info("==> startImportConsumer(atlasAsyncImportRequest={})", 
importRequest);
-
-            
notificationHookConsumer.startAsyncImportConsumer(NotificationInterface.NotificationType.ASYNC_IMPORT,
 importRequest.getImportId(), importRequest.getTopicName());
-
-            importRequest.setStatus(ImportStatus.PROCESSING);
-            importRequest.setProcessingStartTime(System.currentTimeMillis());
-        } catch (Exception e) {
-            LOG.error("Failed to start consumer for import: {}, marking import 
as failed", importRequest, e);
-
-            importRequest.setStatus(ImportStatus.FAILED);
-        } finally {
-            asyncImportService.updateImportRequest(importRequest);
-
-            if (ObjectUtils.equals(importRequest.getStatus(), 
ImportStatus.FAILED)) {
-                onCompleteImportRequest(importRequest.getImportId());
-            }
-
-            LOG.info("<== startImportConsumer(atlasAsyncImportRequest={})", 
importRequest);
-        }
-    }
-
     @VisibleForTesting
     AtlasAsyncImportRequest getNextImportFromQueue() {
         LOG.info("==> getNextImportFromQueue()");
 
-        final int               maxRetries = 5;
-        int                     retryCount = 0;
-        AtlasAsyncImportRequest nextImport = null;
+        final int maxRetries = 5;
+        int       retryCount = 0;
 
         while (retryCount < maxRetries) {
             try {
@@ -248,8 +277,10 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
                 return importRequest;
             } catch (InterruptedException e) {
                 LOG.error("Thread interrupted while waiting for importId from 
the queue", e);
+
                 // Restore the interrupt flag
                 Thread.currentThread().interrupt();
+
                 return null;
             }
         }
@@ -265,123 +296,67 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
                 (!ImportStatus.WAITING.equals(importRequest.getStatus()) && 
!ImportStatus.PROCESSING.equals(importRequest.getStatus()));
     }
 
-    private void releaseAsyncImportSemaphore() {
-        LOG.info("==> releaseAsyncImportSemaphore()");
-
-        if (asyncImportSemaphore.availablePermits() == 0) {
-            asyncImportSemaphore.release();
-
-            LOG.info("<== releaseAsyncImportSemaphore()");
-        } else {
-            LOG.info("<== releaseAsyncImportSemaphore(); no lock held");
-        }
-    }
-
     void populateRequestQueue() {
         LOG.info("==> populateRequestQueue()");
 
-        List<String> importRequests = 
asyncImportService.fetchQueuedImportRequests();
+        List<String> queuedImports     = 
asyncImportService.fetchQueuedImportRequests();
+        List<String> inProgressImports = 
asyncImportService.fetchInProgressImportIds();
 
-        try {
-            if (!importRequests.isEmpty()) {
-                for (String request : importRequests) {
-                    try {
-                        if (!requestQueue.offer(request, 5, TimeUnit.SECONDS)) 
{ // Wait up to 5 sec
-                            LOG.warn("populateRequestQueue(): Request {} could 
not be added to the queue", request);
-                        }
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-
-                        LOG.error("populateRequestQueue(): Failed to add 
requests to queue");
-
-                        break; // Exit loop on interruption
-                    }
-                }
+        if (queuedImports.isEmpty() && inProgressImports.isEmpty()) {
+            LOG.info("populateRequestQueue(): no queued asynchronous import 
requests found.");
+        } else {
+            LOG.info("populateRequestQueue(): loaded {} asynchronous import 
requests (in-progress={}, queued={})", (inProgressImports.size() + 
queuedImports.size()), inProgressImports.size(), queuedImports.size());
 
-                LOG.info("populateRequestQueue(): Added {} requests to queue", 
importRequests.size());
-            } else {
-                LOG.warn("populateRequestQueue(): No queued requests found.");
-            }
-        } finally {
-            LOG.info("<== populateRequestQueue()");
+            Stream.concat(inProgressImports.stream(), 
queuedImports.stream()).forEach(this::enqueueImportId);
         }
-    }
 
-    private void resumeInProgressImports() {
-        LOG.info("==> resumeInProgressImports()");
+        LOG.info("<== populateRequestQueue()");
+    }
 
+    private void startImportConsumer(AtlasAsyncImportRequest importRequest) {
         try {
-            String importId = 
asyncImportService.fetchInProgressImportIds().stream().findFirst().orElse(null);
-
-            if (importId == null) {
-                LOG.warn("No imports found to resume");
+            LOG.info("==> startImportConsumer(atlasAsyncImportRequest={})", 
importRequest);
 
-                return;
-            }
+            
notificationHookConsumer.startAsyncImportConsumer(NotificationInterface.NotificationType.ASYNC_IMPORT,
 importRequest.getImportId(), importRequest.getTopicName());
 
-            LOG.info("Resuming import id={}", importId);
+            importRequest.setStatus(ImportStatus.PROCESSING);
+            importRequest.setProcessingStartTime(System.currentTimeMillis());
+        } catch (Exception e) {
+            LOG.error("Failed to start consumer for import: {}, marking import 
as failed", importRequest, e);
 
-            startAsyncImportIfAvailable(importId);
+            importRequest.setStatus(ImportStatus.FAILED);
         } finally {
-            LOG.info("<== resumeInProgressImports()");
-        }
-    }
-
-    @PreDestroy
-    public void stopImport() {
-        LOG.info("Shutting down import processor...");
-
-        executorService.shutdown(); // Initiate an orderly shutdown
-
-        try {
-            if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
-                LOG.warn("Executor service did not terminate gracefully within 
the timeout. Waiting longer...");
-
-                // Retry shutdown before forcing it
-                if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
-                    LOG.warn("Forcing shutdown...");
+            asyncImportService.updateImportRequest(importRequest);
 
-                    executorService.shutdownNow();
-                }
+            if (ObjectUtils.equals(importRequest.getStatus(), 
ImportStatus.FAILED)) {
+                onCompleteImportRequest(importRequest.getImportId());
             }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            LOG.error("Shutdown interrupted. Forcing shutdown...");
 
-            executorService.shutdownNow();
+            LOG.info("<== startImportConsumer(atlasAsyncImportRequest={})", 
importRequest);
         }
-
-        LOG.info("Import processor stopped.");
     }
 
-    @Override
-    public void stop() throws AtlasException {
-        try {
-            stopImport();
-        } finally {
-            releaseAsyncImportSemaphore();
-        }
-    }
+    private void releaseAsyncImportSemaphore() {
+        LOG.info("==> releaseAsyncImportSemaphore()");
 
-    @Override
-    public void instanceIsActive() {
-        LOG.info("Reacting to active state: initializing Kafka consumers");
+        if (asyncImportSemaphore.availablePermits() == 0) {
+            asyncImportSemaphore.release();
 
-        startInternal();
+            LOG.info("<== releaseAsyncImportSemaphore()");
+        } else {
+            LOG.info("<== releaseAsyncImportSemaphore(); no lock held");
+        }
     }
 
-    @Override
-    public void instanceIsPassive() {
+    private void enqueueImportId(String importId) {
         try {
-            stopImport();
-        } finally {
-            releaseAsyncImportSemaphore();
-        }
-    }
+            if (!requestQueue.offer(importId, 5, TimeUnit.SECONDS)) {
+                LOG.warn("populateRequestQueue(): failed to add import {} to 
the queue - enqueue timed out", importId);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
 
-    @Override
-    public int getHandlerOrder() {
-        return 
ActiveStateChangeHandler.HandlerOrder.IMPORT_TASK_LISTENER.getOrder();
+            LOG.error("populateRequestQueue(): Failed to add import {} to the 
queue", importId, e);
+        }
     }
 }
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/service/SecureEmbeddedServer.java 
b/webapp/src/main/java/org/apache/atlas/web/service/SecureEmbeddedServer.java
index 86e289f66..4050c8331 100755
--- 
a/webapp/src/main/java/org/apache/atlas/web/service/SecureEmbeddedServer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/web/service/SecureEmbeddedServer.java
@@ -53,11 +53,11 @@ import java.security.UnrecoverableKeyException;
 import java.security.cert.CertificateException;
 import java.util.List;
 
-import static 
org.apache.atlas.security.SecurityProperties.ATLAS_SSL_EXCLUDE_CIPHER_SUITES;
-import static 
org.apache.atlas.security.SecurityProperties.ATLAS_SSL_EXCLUDE_PROTOCOLS;
+import static 
org.apache.atlas.security.SecurityProperties.ATLAS_SSL_DEFAULT_PROTOCOL;
 import static 
org.apache.atlas.security.SecurityProperties.ATLAS_SSL_ENABLED_ALGORITHMS;
 import static 
org.apache.atlas.security.SecurityProperties.ATLAS_SSL_ENABLED_PROTOCOLS;
-import static 
org.apache.atlas.security.SecurityProperties.ATLAS_SSL_DEFAULT_PROTOCOL;
+import static 
org.apache.atlas.security.SecurityProperties.ATLAS_SSL_EXCLUDE_CIPHER_SUITES;
+import static 
org.apache.atlas.security.SecurityProperties.ATLAS_SSL_EXCLUDE_PROTOCOLS;
 import static org.apache.atlas.security.SecurityProperties.CLIENT_AUTH_KEY;
 import static 
org.apache.atlas.security.SecurityProperties.DEFATULT_TRUSTORE_FILE_LOCATION;
 import static 
org.apache.atlas.security.SecurityProperties.DEFAULT_CIPHER_SUITES;
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
index da89c90fd..2f5281c0d 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
@@ -36,9 +36,11 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.ABORTED;
 import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.FAILED;
@@ -60,6 +62,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 public class ImportTaskListenerImplTest {
@@ -516,6 +519,52 @@ public class ImportTaskListenerImplTest {
         verify(asyncImportSemaphore, times(1)).release();
     }
 
+    @Test
+    public void testStartInternalIsNonBlocking() throws InterruptedException {
+        // Setup synchronization latches
+        CountDownLatch populateDoneLatch = new CountDownLatch(1);
+        CountDownLatch startNextStartedLatch = new CountDownLatch(1);
+        CountDownLatch blockStartNextLatch = new CountDownLatch(1);
+        CountDownLatch methodReturnedLatch = new CountDownLatch(1);
+
+        AtomicBoolean populateCompleted = new AtomicBoolean(false);
+
+        ImportTaskListenerImpl importTaskListenerSpy = 
Mockito.spy(importTaskListener);
+
+        // Mock populateRequestQueue()
+        doAnswer(invocation -> {
+            populateCompleted.set(true);
+            populateDoneLatch.countDown();
+            return null;
+        }).when(importTaskListenerSpy).populateRequestQueue();
+
+        // Mock startNextImportInQueue()
+        doAnswer(invocation -> {
+            assertTrue(populateCompleted.get(), "populateRequestQueue must 
finish before startNextImportInQueue");
+            startNextStartedLatch.countDown();
+            blockStartNextLatch.await();  // block until test releases it
+            return null;
+        }).when(importTaskListenerSpy).startNextImportInQueue();
+
+        // Run startInternal() in a separate thread to track non-blocking 
behavior
+        new Thread(() -> {
+            importTaskListenerSpy.startInternal();
+            methodReturnedLatch.countDown();  // signal that method returned
+        }, "test-startInternal-thread").start();
+
+        // Wait for populateRequestQueue() to be called
+        assertTrue(populateDoneLatch.await(1, TimeUnit.SECONDS), 
"populateRequestQueue didn't complete");
+
+        // Wait for startNextImportInQueue() to start (which confirms async 
call happened)
+        assertTrue(startNextStartedLatch.await(1, TimeUnit.SECONDS), 
"startNextImportInQueue didn't start");
+
+        // Ensure startInternal() already returned
+        assertTrue(methodReturnedLatch.await(1, TimeUnit.SECONDS), 
"startInternal() should return promptly");
+
+        // Unblock async method so thread can exit
+        blockStartNextLatch.countDown();
+    }
+
     private void setExecutorServiceAndSemaphore(ImportTaskListenerImpl 
importTaskListener, ExecutorService mockExecutor, Semaphore mockSemaphore) {
         try {
             Field executorField = 
ImportTaskListenerImpl.class.getDeclaredField("executorService");

Reply via email to