This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 873a6fb0a82ce6dfd5d4b4d9e9394aaeb787f9d0 Author: jackhalfalltrades <chandrakanthperave...@gmail.com> AuthorDate: Wed Jun 25 23:05:58 2025 -0500 ATLAS-5061: Fix delay in startup by making the import startup asynchronous (#390) (cherry picked from commit ebf4253c5973e0a76fe4ccfd9601905a20c79b6b) --- .../apache/atlas/security/SecurityProperties.java | 2 +- .../atlas/notification/ImportTaskListenerImpl.java | 255 ++++++++++----------- .../atlas/web/service/SecureEmbeddedServer.java | 6 +- .../notification/ImportTaskListenerImplTest.java | 49 ++++ 4 files changed, 168 insertions(+), 144 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/security/SecurityProperties.java b/intg/src/main/java/org/apache/atlas/security/SecurityProperties.java index 5bfbe8915..912434c04 100644 --- a/intg/src/main/java/org/apache/atlas/security/SecurityProperties.java +++ b/intg/src/main/java/org/apache/atlas/security/SecurityProperties.java @@ -45,7 +45,7 @@ public final class SecurityProperties { public static final String ATLAS_SSL_EXCLUDE_PROTOCOLS = "atlas.ssl.exclude.protocols"; public static final String ATLAS_SSL_ENABLED_PROTOCOLS = "atlas.ssl.enabled.protocols"; public static final String[] DEFAULT_EXCLUDE_PROTOCOLS = new String[] {"TLSv1", "TLSv1.1"}; - public static final String[] ATLAS_SSL_DEFAULT_PROTOCOL = new String[] { "TLSv1.2" }; + public static final String[] ATLAS_SSL_DEFAULT_PROTOCOL = new String[] {"TLSv1.2"}; private SecurityProperties() { } diff --git a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java index befa79751..c7489debe 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java +++ b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java @@ -49,6 +49,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static org.apache.atlas.AtlasConfiguration.ASYNC_IMPORT_TOPIC_PREFIX; import static org.apache.atlas.AtlasErrorCode.IMPORT_QUEUEING_FAILED; @@ -95,26 +96,34 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler startInternal(); } - private void startInternal() { - CompletableFuture<Void> populateTask = CompletableFuture.runAsync(this::populateRequestQueue) - .exceptionally(ex -> { - LOG.error("Failed to populate request queue", ex); - return null; - }); - - CompletableFuture<Void> resumeTask = CompletableFuture.runAsync(this::resumeInProgressImports) - .exceptionally(ex -> { - LOG.error("Failed to resume in-progress imports", ex); - return null; - }); - - // Wait for both tasks to complete before proceeding - CompletableFuture.allOf(populateTask, resumeTask) - .thenRun(this::startNextImportInQueue) - .exceptionally(ex -> { - LOG.error("Failed to start next import in queue", ex); - return null; - }).join(); + @Override + public void stop() throws AtlasException { + try { + stopImport(); + } finally { + releaseAsyncImportSemaphore(); + } + } + + @Override + public void instanceIsActive() { + LOG.info("Reacting to active state: initializing Kafka consumers"); + + startInternal(); + } + + @Override + public void instanceIsPassive() { + try { + stopImport(); + } finally { + releaseAsyncImportSemaphore(); + } + } + + @Override + public int getHandlerOrder() { + return ActiveStateChangeHandler.HandlerOrder.IMPORT_TASK_LISTENER.getOrder(); } @Override @@ -153,7 +162,50 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler } } - private void startNextImportInQueue() { + @PreDestroy + public void stopImport() { + LOG.info("Shutting down import processor..."); + + executorService.shutdown(); // Initiate an orderly shutdown + + try { + if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.warn("Executor service did not terminate gracefully within the timeout. Waiting longer..."); + + // Retry shutdown before forcing it + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.warn("Forcing shutdown..."); + + executorService.shutdownNow(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + LOG.error("Shutdown interrupted. Forcing shutdown..."); + + executorService.shutdownNow(); + } + + LOG.info("Import processor stopped."); + } + + @VisibleForTesting + void startInternal() { + populateRequestQueue(); + + if (!requestQueue.isEmpty()) { + CompletableFuture.runAsync(this::startNextImportInQueue) + .exceptionally(ex -> { + LOG.error("Failed to start next import in queue", ex); + + return null; + }); + } + } + + @VisibleForTesting + void startNextImportInQueue() { LOG.info("==> startNextImportInQueue()"); startAsyncImportIfAvailable(null); @@ -176,6 +228,7 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler if (isNotValidImportRequest(nextImport)) { releaseAsyncImportSemaphore(); + return; } @@ -189,36 +242,12 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler } } - private void startImportConsumer(AtlasAsyncImportRequest importRequest) { - try { - LOG.info("==> startImportConsumer(atlasAsyncImportRequest={})", importRequest); - - notificationHookConsumer.startAsyncImportConsumer(NotificationInterface.NotificationType.ASYNC_IMPORT, importRequest.getImportId(), importRequest.getTopicName()); - - importRequest.setStatus(ImportStatus.PROCESSING); - importRequest.setProcessingStartTime(System.currentTimeMillis()); - } catch (Exception e) { - LOG.error("Failed to start consumer for import: {}, marking import as failed", importRequest, e); - - importRequest.setStatus(ImportStatus.FAILED); - } finally { - asyncImportService.updateImportRequest(importRequest); - - if (ObjectUtils.equals(importRequest.getStatus(), ImportStatus.FAILED)) { - onCompleteImportRequest(importRequest.getImportId()); - } - - LOG.info("<== startImportConsumer(atlasAsyncImportRequest={})", importRequest); - } - } - @VisibleForTesting AtlasAsyncImportRequest getNextImportFromQueue() { LOG.info("==> getNextImportFromQueue()"); - final int maxRetries = 5; - int retryCount = 0; - AtlasAsyncImportRequest nextImport = null; + final int maxRetries = 5; + int retryCount = 0; while (retryCount < maxRetries) { try { @@ -248,8 +277,10 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler return importRequest; } catch (InterruptedException e) { LOG.error("Thread interrupted while waiting for importId from the queue", e); + // Restore the interrupt flag Thread.currentThread().interrupt(); + return null; } } @@ -265,123 +296,67 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler (!ImportStatus.WAITING.equals(importRequest.getStatus()) && !ImportStatus.PROCESSING.equals(importRequest.getStatus())); } - private void releaseAsyncImportSemaphore() { - LOG.info("==> releaseAsyncImportSemaphore()"); - - if (asyncImportSemaphore.availablePermits() == 0) { - asyncImportSemaphore.release(); - - LOG.info("<== releaseAsyncImportSemaphore()"); - } else { - LOG.info("<== releaseAsyncImportSemaphore(); no lock held"); - } - } - void populateRequestQueue() { LOG.info("==> populateRequestQueue()"); - List<String> importRequests = asyncImportService.fetchQueuedImportRequests(); + List<String> queuedImports = asyncImportService.fetchQueuedImportRequests(); + List<String> inProgressImports = asyncImportService.fetchInProgressImportIds(); - try { - if (!importRequests.isEmpty()) { - for (String request : importRequests) { - try { - if (!requestQueue.offer(request, 5, TimeUnit.SECONDS)) { // Wait up to 5 sec - LOG.warn("populateRequestQueue(): Request {} could not be added to the queue", request); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - LOG.error("populateRequestQueue(): Failed to add requests to queue"); - - break; // Exit loop on interruption - } - } + if (queuedImports.isEmpty() && inProgressImports.isEmpty()) { + LOG.info("populateRequestQueue(): no queued asynchronous import requests found."); + } else { + LOG.info("populateRequestQueue(): loaded {} asynchronous import requests (in-progress={}, queued={})", (inProgressImports.size() + queuedImports.size()), inProgressImports.size(), queuedImports.size()); - LOG.info("populateRequestQueue(): Added {} requests to queue", importRequests.size()); - } else { - LOG.warn("populateRequestQueue(): No queued requests found."); - } - } finally { - LOG.info("<== populateRequestQueue()"); + Stream.concat(inProgressImports.stream(), queuedImports.stream()).forEach(this::enqueueImportId); } - } - private void resumeInProgressImports() { - LOG.info("==> resumeInProgressImports()"); + LOG.info("<== populateRequestQueue()"); + } + private void startImportConsumer(AtlasAsyncImportRequest importRequest) { try { - String importId = asyncImportService.fetchInProgressImportIds().stream().findFirst().orElse(null); - - if (importId == null) { - LOG.warn("No imports found to resume"); + LOG.info("==> startImportConsumer(atlasAsyncImportRequest={})", importRequest); - return; - } + notificationHookConsumer.startAsyncImportConsumer(NotificationInterface.NotificationType.ASYNC_IMPORT, importRequest.getImportId(), importRequest.getTopicName()); - LOG.info("Resuming import id={}", importId); + importRequest.setStatus(ImportStatus.PROCESSING); + importRequest.setProcessingStartTime(System.currentTimeMillis()); + } catch (Exception e) { + LOG.error("Failed to start consumer for import: {}, marking import as failed", importRequest, e); - startAsyncImportIfAvailable(importId); + importRequest.setStatus(ImportStatus.FAILED); } finally { - LOG.info("<== resumeInProgressImports()"); - } - } - - @PreDestroy - public void stopImport() { - LOG.info("Shutting down import processor..."); - - executorService.shutdown(); // Initiate an orderly shutdown - - try { - if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { - LOG.warn("Executor service did not terminate gracefully within the timeout. Waiting longer..."); - - // Retry shutdown before forcing it - if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { - LOG.warn("Forcing shutdown..."); + asyncImportService.updateImportRequest(importRequest); - executorService.shutdownNow(); - } + if (ObjectUtils.equals(importRequest.getStatus(), ImportStatus.FAILED)) { + onCompleteImportRequest(importRequest.getImportId()); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - LOG.error("Shutdown interrupted. Forcing shutdown..."); - executorService.shutdownNow(); + LOG.info("<== startImportConsumer(atlasAsyncImportRequest={})", importRequest); } - - LOG.info("Import processor stopped."); } - @Override - public void stop() throws AtlasException { - try { - stopImport(); - } finally { - releaseAsyncImportSemaphore(); - } - } + private void releaseAsyncImportSemaphore() { + LOG.info("==> releaseAsyncImportSemaphore()"); - @Override - public void instanceIsActive() { - LOG.info("Reacting to active state: initializing Kafka consumers"); + if (asyncImportSemaphore.availablePermits() == 0) { + asyncImportSemaphore.release(); - startInternal(); + LOG.info("<== releaseAsyncImportSemaphore()"); + } else { + LOG.info("<== releaseAsyncImportSemaphore(); no lock held"); + } } - @Override - public void instanceIsPassive() { + private void enqueueImportId(String importId) { try { - stopImport(); - } finally { - releaseAsyncImportSemaphore(); - } - } + if (!requestQueue.offer(importId, 5, TimeUnit.SECONDS)) { + LOG.warn("populateRequestQueue(): failed to add import {} to the queue - enqueue timed out", importId); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); - @Override - public int getHandlerOrder() { - return ActiveStateChangeHandler.HandlerOrder.IMPORT_TASK_LISTENER.getOrder(); + LOG.error("populateRequestQueue(): Failed to add import {} to the queue", importId, e); + } } } diff --git a/webapp/src/main/java/org/apache/atlas/web/service/SecureEmbeddedServer.java b/webapp/src/main/java/org/apache/atlas/web/service/SecureEmbeddedServer.java index 86e289f66..4050c8331 100755 --- a/webapp/src/main/java/org/apache/atlas/web/service/SecureEmbeddedServer.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/SecureEmbeddedServer.java @@ -53,11 +53,11 @@ import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.util.List; -import static org.apache.atlas.security.SecurityProperties.ATLAS_SSL_EXCLUDE_CIPHER_SUITES; -import static org.apache.atlas.security.SecurityProperties.ATLAS_SSL_EXCLUDE_PROTOCOLS; +import static org.apache.atlas.security.SecurityProperties.ATLAS_SSL_DEFAULT_PROTOCOL; import static org.apache.atlas.security.SecurityProperties.ATLAS_SSL_ENABLED_ALGORITHMS; import static org.apache.atlas.security.SecurityProperties.ATLAS_SSL_ENABLED_PROTOCOLS; -import static org.apache.atlas.security.SecurityProperties.ATLAS_SSL_DEFAULT_PROTOCOL; +import static org.apache.atlas.security.SecurityProperties.ATLAS_SSL_EXCLUDE_CIPHER_SUITES; +import static org.apache.atlas.security.SecurityProperties.ATLAS_SSL_EXCLUDE_PROTOCOLS; import static org.apache.atlas.security.SecurityProperties.CLIENT_AUTH_KEY; import static org.apache.atlas.security.SecurityProperties.DEFATULT_TRUSTORE_FILE_LOCATION; import static org.apache.atlas.security.SecurityProperties.DEFAULT_CIPHER_SUITES; diff --git a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java index da89c90fd..2f5281c0d 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java @@ -36,9 +36,11 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.ABORTED; import static org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.FAILED; @@ -60,6 +62,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; public class ImportTaskListenerImplTest { @@ -516,6 +519,52 @@ public class ImportTaskListenerImplTest { verify(asyncImportSemaphore, times(1)).release(); } + @Test + public void testStartInternalIsNonBlocking() throws InterruptedException { + // Setup synchronization latches + CountDownLatch populateDoneLatch = new CountDownLatch(1); + CountDownLatch startNextStartedLatch = new CountDownLatch(1); + CountDownLatch blockStartNextLatch = new CountDownLatch(1); + CountDownLatch methodReturnedLatch = new CountDownLatch(1); + + AtomicBoolean populateCompleted = new AtomicBoolean(false); + + ImportTaskListenerImpl importTaskListenerSpy = Mockito.spy(importTaskListener); + + // Mock populateRequestQueue() + doAnswer(invocation -> { + populateCompleted.set(true); + populateDoneLatch.countDown(); + return null; + }).when(importTaskListenerSpy).populateRequestQueue(); + + // Mock startNextImportInQueue() + doAnswer(invocation -> { + assertTrue(populateCompleted.get(), "populateRequestQueue must finish before startNextImportInQueue"); + startNextStartedLatch.countDown(); + blockStartNextLatch.await(); // block until test releases it + return null; + }).when(importTaskListenerSpy).startNextImportInQueue(); + + // Run startInternal() in a separate thread to track non-blocking behavior + new Thread(() -> { + importTaskListenerSpy.startInternal(); + methodReturnedLatch.countDown(); // signal that method returned + }, "test-startInternal-thread").start(); + + // Wait for populateRequestQueue() to be called + assertTrue(populateDoneLatch.await(1, TimeUnit.SECONDS), "populateRequestQueue didn't complete"); + + // Wait for startNextImportInQueue() to start (which confirms async call happened) + assertTrue(startNextStartedLatch.await(1, TimeUnit.SECONDS), "startNextImportInQueue didn't start"); + + // Ensure startInternal() already returned + assertTrue(methodReturnedLatch.await(1, TimeUnit.SECONDS), "startInternal() should return promptly"); + + // Unblock async method so thread can exit + blockStartNextLatch.countDown(); + } + private void setExecutorServiceAndSemaphore(ImportTaskListenerImpl importTaskListener, ExecutorService mockExecutor, Semaphore mockSemaphore) { try { Field executorField = ImportTaskListenerImpl.class.getDeclaredField("executorService");