This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch AIRAVATA-3981/integration-health-check in repository https://gitbox.apache.org/repos/asf/airavata.git
commit c0d317ebd9e615c58061d5f9c34df11574862018 Author: yasithdev <[email protected]> AuthorDate: Thu Mar 26 15:27:37 2026 -0500 refactor: use ServiceRegistry in AiravataServer for background service lifecycle --- .../apache/airavata/api/server/AiravataServer.java | 40 +++++++++++++--------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/airavata-thrift-server/src/main/java/org/apache/airavata/api/server/AiravataServer.java b/airavata-thrift-server/src/main/java/org/apache/airavata/api/server/AiravataServer.java index 4d4acf38cf..b3ad7a80f1 100644 --- a/airavata-thrift-server/src/main/java/org/apache/airavata/api/server/AiravataServer.java +++ b/airavata-thrift-server/src/main/java/org/apache/airavata/api/server/AiravataServer.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.function.Supplier; import org.apache.airavata.api.Airavata; import org.apache.airavata.api.server.handler.AiravataServerHandler; import org.apache.airavata.api.server.util.Constants; @@ -51,6 +52,7 @@ import org.apache.airavata.monitor.realtime.RealtimeMonitor; import org.apache.airavata.orchestrator.cpi.OrchestratorService; import org.apache.airavata.orchestrator.server.OrchestratorServerHandler; import org.apache.airavata.patform.monitoring.MonitoringServer; +import org.apache.airavata.patform.monitoring.ServiceRegistry; import org.apache.airavata.registry.api.RegistryService; import org.apache.airavata.registry.api.service.handler.RegistryServerHandler; import org.apache.airavata.registry.core.utils.AppCatalogDBInitConfig; @@ -101,8 +103,7 @@ public class AiravataServer { private static final Logger logger = LoggerFactory.getLogger(AiravataServer.class); private TServer server; - private final List<IServer> backgroundServices = new ArrayList<>(); - private final List<Thread> serviceThreads = new ArrayList<>(); + private final ServiceRegistry serviceRegistry = new ServiceRegistry(); private final List<DBInitConfig> dbInitConfigs = Arrays.asList( new ExpCatalogDBInitConfig(), @@ -239,10 +240,13 @@ public class AiravataServer { if (ServerSettings.getBooleanSetting("api.server.monitoring.enabled")) { String monHost = ServerSettings.getSetting("api.server.monitoring.host", "localhost"); int monPort = Integer.parseInt(ServerSettings.getSetting("api.server.monitoring.port", "9097")); - registerAndStart(new MonitoringServer(monHost, monPort), "monitoring_server"); + MonitoringServer monitoringServer = new MonitoringServer(monHost, monPort); + monitoringServer.setServiceRegistry(serviceRegistry); + registerAndStart(monitoringServer, "monitoring_server"); } } catch (Exception e) { logger.warn(" monitoring_server: config error — {}", e.getMessage()); + serviceRegistry.recordError("monitoring_server", e.getMessage()); } // Cluster status monitoring — polls compute resource queue status @@ -252,6 +256,7 @@ public class AiravataServer { } } catch (Exception e) { logger.warn(" cluster_status_monitor: config error — {}", e.getMessage()); + serviceRegistry.recordError("cluster_status_monitor", e.getMessage()); } // Data interpreter — metadata analysis for submitted jobs @@ -261,6 +266,7 @@ public class AiravataServer { } } catch (Exception e) { logger.warn(" data_interpreter: config error — {}", e.getMessage()); + serviceRegistry.recordError("data_interpreter", e.getMessage()); } // Process rescheduler — retries/reschedules failed processes @@ -270,6 +276,7 @@ public class AiravataServer { } } catch (Exception e) { logger.warn(" process_rescheduler: config error — {}", e.getMessage()); + serviceRegistry.recordError("process_rescheduler", e.getMessage()); } // Execution engine services — controller must initialize the cluster @@ -279,6 +286,7 @@ public class AiravataServer { waitForHelixCluster(); } catch (Exception e) { logger.warn(" helix_controller: config error — {}", e.getMessage()); + serviceRegistry.recordError("helix_controller", e.getMessage()); } try { ArrayList<Class<? extends AbstractTask>> taskClasses = new ArrayList<>(); @@ -288,21 +296,25 @@ public class AiravataServer { registerAndStart(new GlobalParticipant(taskClasses, null), "helix_participant"); } catch (Exception e) { logger.warn(" helix_participant: config error — {}", e.getMessage()); + serviceRegistry.recordError("helix_participant", e.getMessage()); } try { registerAndStart(new PreWorkflowManager(), "pre_workflow_manager"); } catch (Exception e) { logger.warn(" pre_workflow_manager: config error — {}", e.getMessage()); + serviceRegistry.recordError("pre_workflow_manager", e.getMessage()); } try { registerAndStart(new PostWorkflowManager(), "post_workflow_manager"); } catch (Exception e) { logger.warn(" post_workflow_manager: config error — {}", e.getMessage()); + serviceRegistry.recordError("post_workflow_manager", e.getMessage()); } try { registerAndStart(new ParserWorkflowManager(), "parser_workflow_manager"); } catch (Exception e) { logger.warn(" parser_workflow_manager: config error — {}", e.getMessage()); + serviceRegistry.recordError("parser_workflow_manager", e.getMessage()); } // Job monitors @@ -312,14 +324,16 @@ public class AiravataServer { } } catch (Exception e) { logger.warn(" email_monitor: config error — {}", e.getMessage()); + serviceRegistry.recordError("email_monitor", e.getMessage()); } try { registerAndStart(new RealtimeMonitor(), "realtime_monitor"); } catch (Exception e) { logger.warn(" realtime_monitor: config error — {}", e.getMessage()); + serviceRegistry.recordError("realtime_monitor", e.getMessage()); } - logger.info("Background services initialization complete ({} running)", backgroundServices.size()); + logger.info("Background services initialization complete"); } private void waitForHelixCluster() { @@ -347,11 +361,14 @@ public class AiravataServer { } private void registerAndStart(IServer service, String label) { + registerAndStart(service, label, null); + } + + private void registerAndStart(IServer service, String label, Supplier<IServer> factory) { Thread t = new Thread(service, "airavata-" + label); t.setDaemon(true); t.start(); - backgroundServices.add(service); - serviceThreads.add(t); + serviceRegistry.register(label, service, t, factory); logger.info(" {}: started", label); } @@ -359,16 +376,7 @@ public class AiravataServer { if (server != null && server.isServing()) { server.stop(); } - for (IServer service : backgroundServices) { - try { - service.stop(); - } catch (Exception e) { - logger.warn("Error stopping {}: {}", service.getName(), e.getMessage()); - } - } - for (Thread t : serviceThreads) { - t.interrupt(); - } + serviceRegistry.stopAll(); } public static void main(String[] args) {
