This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch AIRAVATA-3981/integration-health-check in repository https://gitbox.apache.org/repos/asf/airavata.git
commit f9a5d4c951634063c2b974adba5da81cfd8ba267 Author: yasithdev <[email protected]> AuthorDate: Thu Mar 26 15:27:34 2026 -0500 feat: add /health/services and /admin/restart endpoints to MonitoringServer --- .../patform/monitoring/MonitoringServer.java | 125 +++++++++++++++++++-- 1 file changed, 115 insertions(+), 10 deletions(-) diff --git a/airavata-api/src/main/java/org/apache/airavata/patform/monitoring/MonitoringServer.java b/airavata-api/src/main/java/org/apache/airavata/patform/monitoring/MonitoringServer.java index 23d90111b7..1f7bac390d 100644 --- a/airavata-api/src/main/java/org/apache/airavata/patform/monitoring/MonitoringServer.java +++ b/airavata-api/src/main/java/org/apache/airavata/patform/monitoring/MonitoringServer.java @@ -19,8 +19,17 @@ */ package org.apache.airavata.patform.monitoring; -import io.prometheus.client.exporter.HTTPServer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.common.TextFormat; import java.io.IOException; +import java.io.OutputStream; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Map; import org.apache.airavata.common.utils.IServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,9 +39,11 @@ public class MonitoringServer implements IServer { private static final Logger logger = LoggerFactory.getLogger(MonitoringServer.class); private static final String SERVER_NAME = "Monitoring Server"; - private String host; - private int port; - private HTTPServer httpServer; + private final String host; + private final int port; + private final ObjectMapper objectMapper = new ObjectMapper(); + private HttpServer httpServer; + private ServiceRegistry serviceRegistry; private ServerStatus status = ServerStatus.STOPPED; public MonitoringServer(String host, int port) { @@ -40,6 +51,10 @@ public class MonitoringServer implements IServer { this.port = port; } + public void setServiceRegistry(ServiceRegistry serviceRegistry) { + this.serviceRegistry = serviceRegistry; + } + @Override public String getName() { return SERVER_NAME; @@ -49,10 +64,18 @@ public class MonitoringServer implements IServer { public void run() { setStatus(ServerStatus.STARTING); try { - logger.info("Starting the monitoring server"); - httpServer = new HTTPServer(host, port, true); + logger.info("Starting the monitoring server on {}:{}", host, port); + httpServer = HttpServer.create(new InetSocketAddress(host, port), 0); + + httpServer.createContext("/metrics", this::handleMetrics); + httpServer.createContext("/health/services", this::handleHealthServices); + httpServer.createContext("/admin/restart/", this::handleAdminRestart); + + httpServer.start(); setStatus(ServerStatus.STARTED); - // HTTPServer is non-blocking; park this thread until interrupted + logger.info("Monitoring server started on {}:{}", host, port); + + // Park thread until interrupted while (!Thread.currentThread().isInterrupted()) { try { Thread.sleep(Long.MAX_VALUE); @@ -61,7 +84,7 @@ public class MonitoringServer implements IServer { } } } catch (IOException e) { - logger.error("Failed to start the monitoring server on host {} na port {}", host, port, e); + logger.error("Failed to start the monitoring server on host {} and port {}", host, port, e); setStatus(ServerStatus.FAILED); } } @@ -70,8 +93,8 @@ public class MonitoringServer implements IServer { public void stop() { setStatus(ServerStatus.STOPPING); if (httpServer != null) { - logger.info("Stopping the monitor server"); - httpServer.stop(); + logger.info("Stopping the monitoring server"); + httpServer.stop(0); } setStatus(ServerStatus.STOPPED); } @@ -85,4 +108,86 @@ public class MonitoringServer implements IServer { status = stat; status.updateTime(); } + + private void handleMetrics(HttpExchange exchange) throws IOException { + if (!"GET".equalsIgnoreCase(exchange.getRequestMethod())) { + sendResponse(exchange, 405, "Method Not Allowed"); + return; + } + try { + StringWriter writer = new StringWriter(); + TextFormat.write004(writer, CollectorRegistry.defaultRegistry.metricFamilySamples()); + byte[] body = writer.toString().getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().set("Content-Type", TextFormat.CONTENT_TYPE_004); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream os = exchange.getResponseBody()) { + os.write(body); + } + } catch (Exception e) { + logger.error("Error serving /metrics", e); + sendResponse(exchange, 500, "Internal Server Error"); + } + } + + private void handleHealthServices(HttpExchange exchange) throws IOException { + if (!"GET".equalsIgnoreCase(exchange.getRequestMethod())) { + sendResponse(exchange, 405, "Method Not Allowed"); + return; + } + try { + Map<String, ServiceStatus> statuses = + serviceRegistry != null ? serviceRegistry.getStatuses() : Map.of(); + sendJson(exchange, 200, statuses); + } catch (Exception e) { + logger.error("Error serving /health/services", e); + sendResponse(exchange, 500, "Internal Server Error"); + } + } + + private void handleAdminRestart(HttpExchange exchange) throws IOException { + if (!"POST".equalsIgnoreCase(exchange.getRequestMethod())) { + sendResponse(exchange, 405, "Method Not Allowed"); + return; + } + String path = exchange.getRequestURI().getPath(); + // path is /admin/restart/{name} + String prefix = "/admin/restart/"; + if (!path.startsWith(prefix) || path.length() <= prefix.length()) { + sendJson(exchange, 400, Map.of("error", "Missing service name in path")); + return; + } + String name = path.substring(prefix.length()); + if (serviceRegistry == null) { + sendJson(exchange, 500, Map.of("error", "ServiceRegistry not configured")); + return; + } + try { + serviceRegistry.restart(name); + sendJson(exchange, 200, Map.of("status", "restarted", "service", name)); + } catch (IllegalArgumentException e) { + sendJson(exchange, 404, Map.of("error", e.getMessage())); + } catch (IllegalStateException e) { + sendJson(exchange, 400, Map.of("error", e.getMessage())); + } catch (Exception e) { + logger.error("Error restarting service '{}'", name, e); + sendJson(exchange, 500, Map.of("error", e.getMessage())); + } + } + + private void sendJson(HttpExchange exchange, int statusCode, Object body) throws IOException { + byte[] bytes = objectMapper.writeValueAsBytes(body); + exchange.getResponseHeaders().set("Content-Type", "application/json"); + exchange.sendResponseHeaders(statusCode, bytes.length); + try (OutputStream os = exchange.getResponseBody()) { + os.write(bytes); + } + } + + private void sendResponse(HttpExchange exchange, int statusCode, String body) throws IOException { + byte[] bytes = body.getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(statusCode, bytes.length); + try (OutputStream os = exchange.getResponseBody()) { + os.write(bytes); + } + } }
