This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch feat/airavata-service-layer in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 2860233290b373a57fd393c59988b9b404ae7f5e Author: yasithdev <[email protected]> AuthorDate: Thu Mar 26 12:03:21 2026 -0500 feat: migrate remaining experiment methods to service layer - Add isUserExists to GatewayService - Add fetchIntermediateOutputs, getIntermediateOutputProcessStatus, and launchExperiment to ExperimentService - Add tests for new methods in ExperimentServiceTest and GatewayServiceTest --- .../service/experiment/ExperimentService.java | 217 +++++++++++++++++++++ .../airavata/service/gateway/GatewayService.java | 9 + .../service/experiment/ExperimentServiceTest.java | 107 +++++++++- .../service/gateway/GatewayServiceTest.java | 19 ++ 4 files changed, 351 insertions(+), 1 deletion(-) diff --git a/airavata-api/src/main/java/org/apache/airavata/service/experiment/ExperimentService.java b/airavata-api/src/main/java/org/apache/airavata/service/experiment/ExperimentService.java index b1d61e3125..bf86ff92a0 100644 --- a/airavata-api/src/main/java/org/apache/airavata/service/experiment/ExperimentService.java +++ b/airavata-api/src/main/java/org/apache/airavata/service/experiment/ExperimentService.java @@ -7,10 +7,15 @@ import org.apache.airavata.model.experiment.ExperimentSummaryModel; import org.apache.airavata.model.experiment.ExperimentSearchFields; import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; import org.apache.airavata.model.status.ExperimentState; import org.apache.airavata.model.status.ExperimentStatus; +import org.apache.airavata.model.status.JobState; import org.apache.airavata.model.status.JobStatus; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.status.ProcessStatus; +import org.apache.airavata.model.task.TaskTypes; import org.apache.airavata.model.workspace.Project; import org.apache.airavata.model.experiment.UserConfigurationDataModel; import org.apache.airavata.registry.api.service.handler.RegistryServerHandler; @@ -28,9 +33,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; public class ExperimentService { @@ -468,6 +477,214 @@ public class ExperimentService { } } + public void fetchIntermediateOutputs(RequestContext ctx, String experimentId, List<String> outputNames) + throws ServiceException { + try { + // Verify that user has WRITE access to experiment + if (!userHasWriteAccess(ctx, experimentId)) { + throw new ServiceAuthorizationException("User does not have WRITE access to this experiment"); + } + + // Verify that the experiment's job is currently ACTIVE + ExperimentModel existingExperiment = registryHandler.getExperiment(experimentId); + List<JobModel> jobs = registryHandler.getJobDetails(experimentId); + boolean anyJobIsActive = jobs.stream().anyMatch(j -> { + if (j.getJobStatusesSize() > 0) { + return j.getJobStatuses().get(j.getJobStatusesSize() - 1).getJobState() == JobState.ACTIVE; + } + return false; + }); + if (!anyJobIsActive) { + throw new ServiceException("Experiment does not have currently ACTIVE job"); + } + + // Check if there are already running intermediate output fetching processes for outputNames + List<ProcessModel> intermediateOutputFetchProcesses = existingExperiment.getProcesses().stream() + .filter(p -> { + if (p.getProcessStatusesSize() > 0) { + ProcessStatus latestStatus = p.getProcessStatuses().get(p.getProcessStatusesSize() - 1); + if (latestStatus.getState() == ProcessState.COMPLETED + || latestStatus.getState() == ProcessState.FAILED) { + return false; + } + } + return true; + }) + .filter(p -> p.getTasks().stream().allMatch(t -> t.getTaskType() == TaskTypes.OUTPUT_FETCHING)) + .filter(p -> p.getProcessOutputs().stream().anyMatch(o -> outputNames.contains(o.getName()))) + .collect(Collectors.toList()); + if (!intermediateOutputFetchProcesses.isEmpty()) { + throw new ServiceException( + "There are already intermediate output fetching tasks running for those outputs."); + } + + eventPublisher.publishIntermediateOutputs(experimentId, ctx.getGatewayId(), outputNames); + } catch (ServiceException e) { + throw e; + } catch (Exception e) { + throw new ServiceException( + "Error while processing request to fetch intermediate outputs for experiment: " + + experimentId + ": " + e.getMessage(), e); + } + } + + public ProcessStatus getIntermediateOutputProcessStatus(RequestContext ctx, String experimentId, + List<String> outputNames) throws ServiceException { + try { + // Verify that user has READ access to experiment + if (!userHasReadAccess(ctx, experimentId)) { + throw new ServiceAuthorizationException("User does not have READ access to this experiment"); + } + + ExperimentModel existingExperiment = registryHandler.getExperiment(experimentId); + + // Find the most recent intermediate output fetching process for the outputNames + Optional<ProcessModel> mostRecentOutputFetchProcess = existingExperiment.getProcesses().stream() + .filter(p -> p.getTasks().stream().allMatch(t -> t.getTaskType() == TaskTypes.OUTPUT_FETCHING)) + .filter(p -> { + List<String> names = p.getProcessOutputs().stream() + .map(o -> o.getName()) + .collect(Collectors.toList()); + return new HashSet<>(names).equals(new HashSet<>(outputNames)); + }) + .sorted(Comparator.comparing(ProcessModel::getLastUpdateTime).reversed()) + .findFirst(); + + if (!mostRecentOutputFetchProcess.isPresent()) { + throw new ServiceException("No matching intermediate output fetching process found."); + } + + ProcessModel process = mostRecentOutputFetchProcess.get(); + if (process.getProcessStatusesSize() > 0) { + return process.getProcessStatuses().get(process.getProcessStatusesSize() - 1); + } else { + return new ProcessStatus(ProcessState.CREATED); + } + } catch (ServiceException e) { + throw e; + } catch (Exception e) { + throw new ServiceException( + "Error while getting intermediate output process status for experiment: " + + experimentId + ": " + e.getMessage(), e); + } + } + + public void launchExperiment(RequestContext ctx, String experimentId, String gatewayId, + List<org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile> accessibleGroupResourceProfiles) + throws ServiceException { + try { + ExperimentModel experiment = registryHandler.getExperiment(experimentId); + if (experiment == null) { + throw new ServiceException("Experiment " + experimentId + " does not exist"); + } + + // For backwards compatibility, if there is no groupResourceProfileId, pick one + if (!experiment.getUserConfigurationData().isSetGroupResourceProfileId()) { + if (!accessibleGroupResourceProfiles.isEmpty()) { + final String groupResourceProfileId = + accessibleGroupResourceProfiles.get(0).getGroupResourceProfileId(); + logger.warn( + "Experiment {} doesn't have groupResourceProfileId, picking first one user has access to: {}", + experimentId, groupResourceProfileId); + experiment.getUserConfigurationData().setGroupResourceProfileId(groupResourceProfileId); + registryHandler.updateExperimentConfiguration( + experimentId, experiment.getUserConfigurationData()); + } else { + throw new ServiceAuthorizationException("User " + ctx.getUserId() + " in gateway " + gatewayId + + " doesn't have access to any group resource profiles."); + } + } + + // Verify user has READ access to groupResourceProfileId + String qualifiedUserId = ctx.getUserId() + "@" + gatewayId; + if (!sharingHandler.userHasAccess( + gatewayId, qualifiedUserId, + experiment.getUserConfigurationData().getGroupResourceProfileId(), + gatewayId + ":READ")) { + throw new ServiceAuthorizationException("User " + ctx.getUserId() + " in gateway " + gatewayId + + " doesn't have access to group resource profile " + + experiment.getUserConfigurationData().getGroupResourceProfileId()); + } + + // Verify user has READ access to Application Deployment + final String appInterfaceId = experiment.getExecutionId(); + org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription applicationInterfaceDescription = + registryHandler.getApplicationInterface(appInterfaceId); + List<String> appModuleIds = applicationInterfaceDescription.getApplicationModules(); + String appModuleId = appModuleIds.get(0); + List<org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription> applicationDeploymentDescriptions = + registryHandler.getApplicationDeployments(appModuleId); + + if (!experiment.getUserConfigurationData().isAiravataAutoSchedule()) { + final String resourceHostId = experiment + .getUserConfigurationData() + .getComputationalResourceScheduling() + .getResourceHostId(); + Optional<org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription> appDeployment = + applicationDeploymentDescriptions.stream() + .filter(dep -> dep.getComputeHostId().equals(resourceHostId)) + .findFirst(); + if (appDeployment.isPresent()) { + final String appDeploymentId = appDeployment.get().getAppDeploymentId(); + if (!sharingHandler.userHasAccess( + gatewayId, qualifiedUserId, appDeploymentId, gatewayId + ":READ")) { + throw new ServiceAuthorizationException("User " + ctx.getUserId() + " in gateway " + gatewayId + + " doesn't have access to app deployment " + appDeploymentId); + } + } else { + throw new ServiceException("Application deployment doesn't exist for application interface " + + appInterfaceId + " and host " + resourceHostId + " in gateway " + gatewayId); + } + } else if (experiment.getUserConfigurationData().getAutoScheduledCompResourceSchedulingList() != null + && !experiment.getUserConfigurationData().getAutoScheduledCompResourceSchedulingList().isEmpty()) { + for (org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel crScheduling + : experiment.getUserConfigurationData().getAutoScheduledCompResourceSchedulingList()) { + Optional<org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription> appDeployment = + applicationDeploymentDescriptions.stream() + .filter(dep -> dep.getComputeHostId().equals(crScheduling.getResourceHostId())) + .findFirst(); + if (appDeployment.isPresent()) { + final String appDeploymentId = appDeployment.get().getAppDeploymentId(); + if (!sharingHandler.userHasAccess( + gatewayId, qualifiedUserId, appDeploymentId, gatewayId + ":READ")) { + throw new ServiceAuthorizationException("User " + ctx.getUserId() + " in gateway " + gatewayId + + " doesn't have access to app deployment " + appDeploymentId); + } + } + } + } + + eventPublisher.publishExperimentLaunch(experimentId, gatewayId); + logger.info("Experiment with ExpId: {} was submitted in gateway: {}", experimentId, gatewayId); + } catch (ServiceException e) { + throw e; + } catch (Exception e) { + throw new ServiceException("Error while launching experiment: " + e.getMessage(), e); + } + } + + private boolean userHasWriteAccess(RequestContext ctx, String entityId) { + String domainId = ctx.getGatewayId(); + String qualifiedUserId = ctx.getUserId() + "@" + domainId; + try { + boolean isOwner = sharingHandler.userHasAccess(domainId, qualifiedUserId, entityId, domainId + ":OWNER"); + return isOwner || sharingHandler.userHasAccess(domainId, qualifiedUserId, entityId, domainId + ":WRITE"); + } catch (Exception e) { + throw new RuntimeException("Unable to check if user has access", e); + } + } + + private boolean userHasReadAccess(RequestContext ctx, String entityId) { + String domainId = ctx.getGatewayId(); + String qualifiedUserId = ctx.getUserId() + "@" + domainId; + try { + boolean isOwner = sharingHandler.userHasAccess(domainId, qualifiedUserId, entityId, domainId + ":OWNER"); + return isOwner || sharingHandler.userHasAccess(domainId, qualifiedUserId, entityId, domainId + ":READ"); + } catch (Exception e) { + throw new RuntimeException("Unable to check if user has access", e); + } + } + private boolean isSharingEnabled() { try { return ServerSettings.isEnableSharing(); diff --git a/airavata-api/src/main/java/org/apache/airavata/service/gateway/GatewayService.java b/airavata-api/src/main/java/org/apache/airavata/service/gateway/GatewayService.java index 88fa6bfe3c..979fd55b87 100644 --- a/airavata-api/src/main/java/org/apache/airavata/service/gateway/GatewayService.java +++ b/airavata-api/src/main/java/org/apache/airavata/service/gateway/GatewayService.java @@ -156,6 +156,15 @@ public class GatewayService { } } + public boolean isUserExists(RequestContext ctx, String gatewayId, String userName) throws ServiceException { + try { + logger.debug("Checking if the user {} exists in the gateway {}", userName, gatewayId); + return registryHandler.isUserExists(gatewayId, userName); + } catch (Exception e) { + throw new ServiceException("Error while verifying user: " + e.getMessage(), e); + } + } + private boolean isSharingEnabled() { try { return ServerSettings.isEnableSharing(); diff --git a/airavata-api/src/test/java/org/apache/airavata/service/experiment/ExperimentServiceTest.java b/airavata-api/src/test/java/org/apache/airavata/service/experiment/ExperimentServiceTest.java index a62c77e095..acf27a5d82 100644 --- a/airavata-api/src/test/java/org/apache/airavata/service/experiment/ExperimentServiceTest.java +++ b/airavata-api/src/test/java/org/apache/airavata/service/experiment/ExperimentServiceTest.java @@ -1,11 +1,20 @@ package org.apache.airavata.service.experiment; +import org.apache.airavata.model.application.io.InputDataObjectType; +import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.experiment.ExperimentModel; import org.apache.airavata.model.experiment.ExperimentStatistics; +import org.apache.airavata.model.experiment.UserConfigurationDataModel; +import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.status.ExperimentState; import org.apache.airavata.model.status.ExperimentStatus; +import org.apache.airavata.model.status.JobState; import org.apache.airavata.model.status.JobStatus; -import org.apache.airavata.model.application.io.OutputDataObjectType; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.status.ProcessStatus; +import org.apache.airavata.model.task.TaskModel; +import org.apache.airavata.model.task.TaskTypes; import org.apache.airavata.registry.api.service.handler.RegistryServerHandler; import org.apache.airavata.service.context.RequestContext; import org.apache.airavata.service.exception.ServiceAuthorizationException; @@ -177,4 +186,100 @@ class ExperimentServiceTest { boolean result = experimentService.validateExperiment(ctx, "exp-123"); assertTrue(result); } + + @Test + void fetchIntermediateOutputs_throwsWhenNoAccess() throws Exception { + // User doesn't have owner or write access + when(sharingHandler.userHasAccess("testGateway", "testUser@testGateway", "exp-123", "testGateway:OWNER")) + .thenReturn(false); + when(sharingHandler.userHasAccess("testGateway", "testUser@testGateway", "exp-123", "testGateway:WRITE")) + .thenReturn(false); + + assertThrows(ServiceAuthorizationException.class, + () -> experimentService.fetchIntermediateOutputs(ctx, "exp-123", List.of("output1"))); + } + + @Test + void fetchIntermediateOutputs_throwsWhenNoActiveJob() throws Exception { + // User has write access + when(sharingHandler.userHasAccess("testGateway", "testUser@testGateway", "exp-123", "testGateway:OWNER")) + .thenReturn(true); + + ExperimentModel experiment = new ExperimentModel(); + experiment.setUserName("testUser"); + experiment.setGatewayId("testGateway"); + experiment.setProcesses(new java.util.ArrayList<>()); + when(registryHandler.getExperiment("exp-123")).thenReturn(experiment); + + // No active jobs + JobModel job = new JobModel(); + JobStatus jobStatus = new JobStatus(JobState.COMPLETE); + job.addToJobStatuses(jobStatus); + when(registryHandler.getJobDetails("exp-123")).thenReturn(List.of(job)); + + assertThrows(ServiceException.class, + () -> experimentService.fetchIntermediateOutputs(ctx, "exp-123", List.of("output1"))); + } + + @Test + void fetchIntermediateOutputs_publishesEventWhenValid() throws Exception { + when(sharingHandler.userHasAccess("testGateway", "testUser@testGateway", "exp-123", "testGateway:OWNER")) + .thenReturn(true); + + ExperimentModel experiment = new ExperimentModel(); + experiment.setUserName("testUser"); + experiment.setGatewayId("testGateway"); + experiment.setProcesses(new java.util.ArrayList<>()); // no in-progress output fetch processes + when(registryHandler.getExperiment("exp-123")).thenReturn(experiment); + + JobModel job = new JobModel(); + JobStatus jobStatus = new JobStatus(JobState.ACTIVE); + job.addToJobStatuses(jobStatus); + when(registryHandler.getJobDetails("exp-123")).thenReturn(List.of(job)); + + experimentService.fetchIntermediateOutputs(ctx, "exp-123", List.of("output1")); + + verify(eventPublisher).publishIntermediateOutputs("exp-123", "testGateway", List.of("output1")); + } + + @Test + void getIntermediateOutputProcessStatus_throwsWhenNoAccess() throws Exception { + when(sharingHandler.userHasAccess("testGateway", "testUser@testGateway", "exp-123", "testGateway:OWNER")) + .thenReturn(false); + when(sharingHandler.userHasAccess("testGateway", "testUser@testGateway", "exp-123", "testGateway:READ")) + .thenReturn(false); + + assertThrows(ServiceAuthorizationException.class, + () -> experimentService.getIntermediateOutputProcessStatus(ctx, "exp-123", List.of("output1"))); + } + + @Test + void getIntermediateOutputProcessStatus_returnsLatestStatus() throws Exception { + when(sharingHandler.userHasAccess("testGateway", "testUser@testGateway", "exp-123", "testGateway:OWNER")) + .thenReturn(true); + + ExperimentModel experiment = new ExperimentModel(); + experiment.setUserName("testUser"); + experiment.setGatewayId("testGateway"); + + // Build a process with OUTPUT_FETCHING task and a matching output + ProcessModel process = new ProcessModel(); + process.setLastUpdateTime(1000L); + TaskModel task = new TaskModel(); + task.setTaskType(TaskTypes.OUTPUT_FETCHING); + process.addToTasks(task); + OutputDataObjectType out = new OutputDataObjectType(); + out.setName("output1"); + process.addToProcessOutputs(out); + ProcessStatus ps = new ProcessStatus(ProcessState.EXECUTING); + process.addToProcessStatuses(ps); + experiment.addToProcesses(process); + + when(registryHandler.getExperiment("exp-123")).thenReturn(experiment); + + ProcessStatus result = experimentService.getIntermediateOutputProcessStatus( + ctx, "exp-123", List.of("output1")); + + assertEquals(ProcessState.EXECUTING, result.getState()); + } } diff --git a/airavata-api/src/test/java/org/apache/airavata/service/gateway/GatewayServiceTest.java b/airavata-api/src/test/java/org/apache/airavata/service/gateway/GatewayServiceTest.java index 4e3016bae6..7d68a2f1f2 100644 --- a/airavata-api/src/test/java/org/apache/airavata/service/gateway/GatewayServiceTest.java +++ b/airavata-api/src/test/java/org/apache/airavata/service/gateway/GatewayServiceTest.java @@ -119,4 +119,23 @@ class GatewayServiceTest { assertEquals(2, result.size()); verify(registryHandler).getAllUsersInGateway("gw-1"); } + + @Test + void isUserExists_returnsTrueWhenExists() throws Exception { + when(registryHandler.isUserExists("gw-1", "alice")).thenReturn(true); + + boolean result = gatewayService.isUserExists(ctx, "gw-1", "alice"); + + assertTrue(result); + verify(registryHandler).isUserExists("gw-1", "alice"); + } + + @Test + void isUserExists_returnsFalseWhenNotExists() throws Exception { + when(registryHandler.isUserExists("gw-1", "nobody")).thenReturn(false); + + boolean result = gatewayService.isUserExists(ctx, "gw-1", "nobody"); + + assertFalse(result); + } }
