This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch feat/airavata-service-layer
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 2860233290b373a57fd393c59988b9b404ae7f5e
Author: yasithdev <[email protected]>
AuthorDate: Thu Mar 26 12:03:21 2026 -0500

    feat: migrate remaining experiment methods to service layer
    
    - Add isUserExists to GatewayService
    - Add fetchIntermediateOutputs, getIntermediateOutputProcessStatus, and 
launchExperiment to ExperimentService
    - Add tests for new methods in ExperimentServiceTest and GatewayServiceTest
---
 .../service/experiment/ExperimentService.java      | 217 +++++++++++++++++++++
 .../airavata/service/gateway/GatewayService.java   |   9 +
 .../service/experiment/ExperimentServiceTest.java  | 107 +++++++++-
 .../service/gateway/GatewayServiceTest.java        |  19 ++
 4 files changed, 351 insertions(+), 1 deletion(-)

diff --git 
a/airavata-api/src/main/java/org/apache/airavata/service/experiment/ExperimentService.java
 
b/airavata-api/src/main/java/org/apache/airavata/service/experiment/ExperimentService.java
index b1d61e3125..bf86ff92a0 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/service/experiment/ExperimentService.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/service/experiment/ExperimentService.java
@@ -7,10 +7,15 @@ import 
org.apache.airavata.model.experiment.ExperimentSummaryModel;
 import org.apache.airavata.model.experiment.ExperimentSearchFields;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.process.ProcessModel;
 import 
org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
 import org.apache.airavata.model.status.ExperimentState;
 import org.apache.airavata.model.status.ExperimentStatus;
+import org.apache.airavata.model.status.JobState;
 import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.model.workspace.Project;
 import org.apache.airavata.model.experiment.UserConfigurationDataModel;
 import org.apache.airavata.registry.api.service.handler.RegistryServerHandler;
@@ -28,9 +33,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class ExperimentService {
 
@@ -468,6 +477,214 @@ public class ExperimentService {
         }
     }
 
+    public void fetchIntermediateOutputs(RequestContext ctx, String 
experimentId, List<String> outputNames)
+            throws ServiceException {
+        try {
+            // Verify that user has WRITE access to experiment
+            if (!userHasWriteAccess(ctx, experimentId)) {
+                throw new ServiceAuthorizationException("User does not have 
WRITE access to this experiment");
+            }
+
+            // Verify that the experiment's job is currently ACTIVE
+            ExperimentModel existingExperiment = 
registryHandler.getExperiment(experimentId);
+            List<JobModel> jobs = registryHandler.getJobDetails(experimentId);
+            boolean anyJobIsActive = jobs.stream().anyMatch(j -> {
+                if (j.getJobStatusesSize() > 0) {
+                    return j.getJobStatuses().get(j.getJobStatusesSize() - 
1).getJobState() == JobState.ACTIVE;
+                }
+                return false;
+            });
+            if (!anyJobIsActive) {
+                throw new ServiceException("Experiment does not have currently 
ACTIVE job");
+            }
+
+            // Check if there are already running intermediate output fetching 
processes for outputNames
+            List<ProcessModel> intermediateOutputFetchProcesses = 
existingExperiment.getProcesses().stream()
+                    .filter(p -> {
+                        if (p.getProcessStatusesSize() > 0) {
+                            ProcessStatus latestStatus = 
p.getProcessStatuses().get(p.getProcessStatusesSize() - 1);
+                            if (latestStatus.getState() == 
ProcessState.COMPLETED
+                                    || latestStatus.getState() == 
ProcessState.FAILED) {
+                                return false;
+                            }
+                        }
+                        return true;
+                    })
+                    .filter(p -> p.getTasks().stream().allMatch(t -> 
t.getTaskType() == TaskTypes.OUTPUT_FETCHING))
+                    .filter(p -> p.getProcessOutputs().stream().anyMatch(o -> 
outputNames.contains(o.getName())))
+                    .collect(Collectors.toList());
+            if (!intermediateOutputFetchProcesses.isEmpty()) {
+                throw new ServiceException(
+                        "There are already intermediate output fetching tasks 
running for those outputs.");
+            }
+
+            eventPublisher.publishIntermediateOutputs(experimentId, 
ctx.getGatewayId(), outputNames);
+        } catch (ServiceException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new ServiceException(
+                    "Error while processing request to fetch intermediate 
outputs for experiment: "
+                            + experimentId + ": " + e.getMessage(), e);
+        }
+    }
+
+    public ProcessStatus getIntermediateOutputProcessStatus(RequestContext 
ctx, String experimentId,
+            List<String> outputNames) throws ServiceException {
+        try {
+            // Verify that user has READ access to experiment
+            if (!userHasReadAccess(ctx, experimentId)) {
+                throw new ServiceAuthorizationException("User does not have 
READ access to this experiment");
+            }
+
+            ExperimentModel existingExperiment = 
registryHandler.getExperiment(experimentId);
+
+            // Find the most recent intermediate output fetching process for 
the outputNames
+            Optional<ProcessModel> mostRecentOutputFetchProcess = 
existingExperiment.getProcesses().stream()
+                    .filter(p -> p.getTasks().stream().allMatch(t -> 
t.getTaskType() == TaskTypes.OUTPUT_FETCHING))
+                    .filter(p -> {
+                        List<String> names = p.getProcessOutputs().stream()
+                                .map(o -> o.getName())
+                                .collect(Collectors.toList());
+                        return new HashSet<>(names).equals(new 
HashSet<>(outputNames));
+                    })
+                    
.sorted(Comparator.comparing(ProcessModel::getLastUpdateTime).reversed())
+                    .findFirst();
+
+            if (!mostRecentOutputFetchProcess.isPresent()) {
+                throw new ServiceException("No matching intermediate output 
fetching process found.");
+            }
+
+            ProcessModel process = mostRecentOutputFetchProcess.get();
+            if (process.getProcessStatusesSize() > 0) {
+                return 
process.getProcessStatuses().get(process.getProcessStatusesSize() - 1);
+            } else {
+                return new ProcessStatus(ProcessState.CREATED);
+            }
+        } catch (ServiceException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new ServiceException(
+                    "Error while getting intermediate output process status 
for experiment: "
+                            + experimentId + ": " + e.getMessage(), e);
+        }
+    }
+
+    public void launchExperiment(RequestContext ctx, String experimentId, 
String gatewayId,
+            
List<org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile>
 accessibleGroupResourceProfiles)
+            throws ServiceException {
+        try {
+            ExperimentModel experiment = 
registryHandler.getExperiment(experimentId);
+            if (experiment == null) {
+                throw new ServiceException("Experiment " + experimentId + " 
does not exist");
+            }
+
+            // For backwards compatibility, if there is no 
groupResourceProfileId, pick one
+            if 
(!experiment.getUserConfigurationData().isSetGroupResourceProfileId()) {
+                if (!accessibleGroupResourceProfiles.isEmpty()) {
+                    final String groupResourceProfileId =
+                            
accessibleGroupResourceProfiles.get(0).getGroupResourceProfileId();
+                    logger.warn(
+                            "Experiment {} doesn't have 
groupResourceProfileId, picking first one user has access to: {}",
+                            experimentId, groupResourceProfileId);
+                    
experiment.getUserConfigurationData().setGroupResourceProfileId(groupResourceProfileId);
+                    registryHandler.updateExperimentConfiguration(
+                            experimentId, 
experiment.getUserConfigurationData());
+                } else {
+                    throw new ServiceAuthorizationException("User " + 
ctx.getUserId() + " in gateway " + gatewayId
+                            + " doesn't have access to any group resource 
profiles.");
+                }
+            }
+
+            // Verify user has READ access to groupResourceProfileId
+            String qualifiedUserId = ctx.getUserId() + "@" + gatewayId;
+            if (!sharingHandler.userHasAccess(
+                    gatewayId, qualifiedUserId,
+                    
experiment.getUserConfigurationData().getGroupResourceProfileId(),
+                    gatewayId + ":READ")) {
+                throw new ServiceAuthorizationException("User " + 
ctx.getUserId() + " in gateway " + gatewayId
+                        + " doesn't have access to group resource profile "
+                        + 
experiment.getUserConfigurationData().getGroupResourceProfileId());
+            }
+
+            // Verify user has READ access to Application Deployment
+            final String appInterfaceId = experiment.getExecutionId();
+            
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription
 applicationInterfaceDescription =
+                    registryHandler.getApplicationInterface(appInterfaceId);
+            List<String> appModuleIds = 
applicationInterfaceDescription.getApplicationModules();
+            String appModuleId = appModuleIds.get(0);
+            
List<org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription>
 applicationDeploymentDescriptions =
+                    registryHandler.getApplicationDeployments(appModuleId);
+
+            if 
(!experiment.getUserConfigurationData().isAiravataAutoSchedule()) {
+                final String resourceHostId = experiment
+                        .getUserConfigurationData()
+                        .getComputationalResourceScheduling()
+                        .getResourceHostId();
+                
Optional<org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription>
 appDeployment =
+                        applicationDeploymentDescriptions.stream()
+                                .filter(dep -> 
dep.getComputeHostId().equals(resourceHostId))
+                                .findFirst();
+                if (appDeployment.isPresent()) {
+                    final String appDeploymentId = 
appDeployment.get().getAppDeploymentId();
+                    if (!sharingHandler.userHasAccess(
+                            gatewayId, qualifiedUserId, appDeploymentId, 
gatewayId + ":READ")) {
+                        throw new ServiceAuthorizationException("User " + 
ctx.getUserId() + " in gateway " + gatewayId
+                                + " doesn't have access to app deployment " + 
appDeploymentId);
+                    }
+                } else {
+                    throw new ServiceException("Application deployment doesn't 
exist for application interface "
+                            + appInterfaceId + " and host " + resourceHostId + 
" in gateway " + gatewayId);
+                }
+            } else if 
(experiment.getUserConfigurationData().getAutoScheduledCompResourceSchedulingList()
 != null
+                    && 
!experiment.getUserConfigurationData().getAutoScheduledCompResourceSchedulingList().isEmpty())
 {
+                for 
(org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel 
crScheduling
+                        : 
experiment.getUserConfigurationData().getAutoScheduledCompResourceSchedulingList())
 {
+                    
Optional<org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription>
 appDeployment =
+                            applicationDeploymentDescriptions.stream()
+                                    .filter(dep -> 
dep.getComputeHostId().equals(crScheduling.getResourceHostId()))
+                                    .findFirst();
+                    if (appDeployment.isPresent()) {
+                        final String appDeploymentId = 
appDeployment.get().getAppDeploymentId();
+                        if (!sharingHandler.userHasAccess(
+                                gatewayId, qualifiedUserId, appDeploymentId, 
gatewayId + ":READ")) {
+                            throw new ServiceAuthorizationException("User " + 
ctx.getUserId() + " in gateway " + gatewayId
+                                    + " doesn't have access to app deployment 
" + appDeploymentId);
+                        }
+                    }
+                }
+            }
+
+            eventPublisher.publishExperimentLaunch(experimentId, gatewayId);
+            logger.info("Experiment with ExpId: {} was submitted in gateway: 
{}", experimentId, gatewayId);
+        } catch (ServiceException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new ServiceException("Error while launching experiment: " + 
e.getMessage(), e);
+        }
+    }
+
+    private boolean userHasWriteAccess(RequestContext ctx, String entityId) {
+        String domainId = ctx.getGatewayId();
+        String qualifiedUserId = ctx.getUserId() + "@" + domainId;
+        try {
+            boolean isOwner = sharingHandler.userHasAccess(domainId, 
qualifiedUserId, entityId, domainId + ":OWNER");
+            return isOwner || sharingHandler.userHasAccess(domainId, 
qualifiedUserId, entityId, domainId + ":WRITE");
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to check if user has access", 
e);
+        }
+    }
+
+    private boolean userHasReadAccess(RequestContext ctx, String entityId) {
+        String domainId = ctx.getGatewayId();
+        String qualifiedUserId = ctx.getUserId() + "@" + domainId;
+        try {
+            boolean isOwner = sharingHandler.userHasAccess(domainId, 
qualifiedUserId, entityId, domainId + ":OWNER");
+            return isOwner || sharingHandler.userHasAccess(domainId, 
qualifiedUserId, entityId, domainId + ":READ");
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to check if user has access", 
e);
+        }
+    }
+
     private boolean isSharingEnabled() {
         try {
             return ServerSettings.isEnableSharing();
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/service/gateway/GatewayService.java
 
b/airavata-api/src/main/java/org/apache/airavata/service/gateway/GatewayService.java
index 88fa6bfe3c..979fd55b87 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/service/gateway/GatewayService.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/service/gateway/GatewayService.java
@@ -156,6 +156,15 @@ public class GatewayService {
         }
     }
 
+    public boolean isUserExists(RequestContext ctx, String gatewayId, String 
userName) throws ServiceException {
+        try {
+            logger.debug("Checking if the user {} exists in the gateway {}", 
userName, gatewayId);
+            return registryHandler.isUserExists(gatewayId, userName);
+        } catch (Exception e) {
+            throw new ServiceException("Error while verifying user: " + 
e.getMessage(), e);
+        }
+    }
+
     private boolean isSharingEnabled() {
         try {
             return ServerSettings.isEnableSharing();
diff --git 
a/airavata-api/src/test/java/org/apache/airavata/service/experiment/ExperimentServiceTest.java
 
b/airavata-api/src/test/java/org/apache/airavata/service/experiment/ExperimentServiceTest.java
index a62c77e095..acf27a5d82 100644
--- 
a/airavata-api/src/test/java/org/apache/airavata/service/experiment/ExperimentServiceTest.java
+++ 
b/airavata-api/src/test/java/org/apache/airavata/service/experiment/ExperimentServiceTest.java
@@ -1,11 +1,20 @@
 package org.apache.airavata.service.experiment;
 
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.experiment.ExperimentStatistics;
+import org.apache.airavata.model.experiment.UserConfigurationDataModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.status.ExperimentState;
 import org.apache.airavata.model.status.ExperimentStatus;
+import org.apache.airavata.model.status.JobState;
 import org.apache.airavata.model.status.JobStatus;
-import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.api.service.handler.RegistryServerHandler;
 import org.apache.airavata.service.context.RequestContext;
 import org.apache.airavata.service.exception.ServiceAuthorizationException;
@@ -177,4 +186,100 @@ class ExperimentServiceTest {
         boolean result = experimentService.validateExperiment(ctx, "exp-123");
         assertTrue(result);
     }
+
+    @Test
+    void fetchIntermediateOutputs_throwsWhenNoAccess() throws Exception {
+        // User doesn't have owner or write access
+        when(sharingHandler.userHasAccess("testGateway", 
"testUser@testGateway", "exp-123", "testGateway:OWNER"))
+                .thenReturn(false);
+        when(sharingHandler.userHasAccess("testGateway", 
"testUser@testGateway", "exp-123", "testGateway:WRITE"))
+                .thenReturn(false);
+
+        assertThrows(ServiceAuthorizationException.class,
+                () -> experimentService.fetchIntermediateOutputs(ctx, 
"exp-123", List.of("output1")));
+    }
+
+    @Test
+    void fetchIntermediateOutputs_throwsWhenNoActiveJob() throws Exception {
+        // User has write access
+        when(sharingHandler.userHasAccess("testGateway", 
"testUser@testGateway", "exp-123", "testGateway:OWNER"))
+                .thenReturn(true);
+
+        ExperimentModel experiment = new ExperimentModel();
+        experiment.setUserName("testUser");
+        experiment.setGatewayId("testGateway");
+        experiment.setProcesses(new java.util.ArrayList<>());
+        when(registryHandler.getExperiment("exp-123")).thenReturn(experiment);
+
+        // No active jobs
+        JobModel job = new JobModel();
+        JobStatus jobStatus = new JobStatus(JobState.COMPLETE);
+        job.addToJobStatuses(jobStatus);
+        
when(registryHandler.getJobDetails("exp-123")).thenReturn(List.of(job));
+
+        assertThrows(ServiceException.class,
+                () -> experimentService.fetchIntermediateOutputs(ctx, 
"exp-123", List.of("output1")));
+    }
+
+    @Test
+    void fetchIntermediateOutputs_publishesEventWhenValid() throws Exception {
+        when(sharingHandler.userHasAccess("testGateway", 
"testUser@testGateway", "exp-123", "testGateway:OWNER"))
+                .thenReturn(true);
+
+        ExperimentModel experiment = new ExperimentModel();
+        experiment.setUserName("testUser");
+        experiment.setGatewayId("testGateway");
+        experiment.setProcesses(new java.util.ArrayList<>());  // no 
in-progress output fetch processes
+        when(registryHandler.getExperiment("exp-123")).thenReturn(experiment);
+
+        JobModel job = new JobModel();
+        JobStatus jobStatus = new JobStatus(JobState.ACTIVE);
+        job.addToJobStatuses(jobStatus);
+        
when(registryHandler.getJobDetails("exp-123")).thenReturn(List.of(job));
+
+        experimentService.fetchIntermediateOutputs(ctx, "exp-123", 
List.of("output1"));
+
+        verify(eventPublisher).publishIntermediateOutputs("exp-123", 
"testGateway", List.of("output1"));
+    }
+
+    @Test
+    void getIntermediateOutputProcessStatus_throwsWhenNoAccess() throws 
Exception {
+        when(sharingHandler.userHasAccess("testGateway", 
"testUser@testGateway", "exp-123", "testGateway:OWNER"))
+                .thenReturn(false);
+        when(sharingHandler.userHasAccess("testGateway", 
"testUser@testGateway", "exp-123", "testGateway:READ"))
+                .thenReturn(false);
+
+        assertThrows(ServiceAuthorizationException.class,
+                () -> 
experimentService.getIntermediateOutputProcessStatus(ctx, "exp-123", 
List.of("output1")));
+    }
+
+    @Test
+    void getIntermediateOutputProcessStatus_returnsLatestStatus() throws 
Exception {
+        when(sharingHandler.userHasAccess("testGateway", 
"testUser@testGateway", "exp-123", "testGateway:OWNER"))
+                .thenReturn(true);
+
+        ExperimentModel experiment = new ExperimentModel();
+        experiment.setUserName("testUser");
+        experiment.setGatewayId("testGateway");
+
+        // Build a process with OUTPUT_FETCHING task and a matching output
+        ProcessModel process = new ProcessModel();
+        process.setLastUpdateTime(1000L);
+        TaskModel task = new TaskModel();
+        task.setTaskType(TaskTypes.OUTPUT_FETCHING);
+        process.addToTasks(task);
+        OutputDataObjectType out = new OutputDataObjectType();
+        out.setName("output1");
+        process.addToProcessOutputs(out);
+        ProcessStatus ps = new ProcessStatus(ProcessState.EXECUTING);
+        process.addToProcessStatuses(ps);
+        experiment.addToProcesses(process);
+
+        when(registryHandler.getExperiment("exp-123")).thenReturn(experiment);
+
+        ProcessStatus result = 
experimentService.getIntermediateOutputProcessStatus(
+                ctx, "exp-123", List.of("output1"));
+
+        assertEquals(ProcessState.EXECUTING, result.getState());
+    }
 }
diff --git 
a/airavata-api/src/test/java/org/apache/airavata/service/gateway/GatewayServiceTest.java
 
b/airavata-api/src/test/java/org/apache/airavata/service/gateway/GatewayServiceTest.java
index 4e3016bae6..7d68a2f1f2 100644
--- 
a/airavata-api/src/test/java/org/apache/airavata/service/gateway/GatewayServiceTest.java
+++ 
b/airavata-api/src/test/java/org/apache/airavata/service/gateway/GatewayServiceTest.java
@@ -119,4 +119,23 @@ class GatewayServiceTest {
         assertEquals(2, result.size());
         verify(registryHandler).getAllUsersInGateway("gw-1");
     }
+
+    @Test
+    void isUserExists_returnsTrueWhenExists() throws Exception {
+        when(registryHandler.isUserExists("gw-1", "alice")).thenReturn(true);
+
+        boolean result = gatewayService.isUserExists(ctx, "gw-1", "alice");
+
+        assertTrue(result);
+        verify(registryHandler).isUserExists("gw-1", "alice");
+    }
+
+    @Test
+    void isUserExists_returnsFalseWhenNotExists() throws Exception {
+        when(registryHandler.isUserExists("gw-1", "nobody")).thenReturn(false);
+
+        boolean result = gatewayService.isUserExists(ctx, "gw-1", "nobody");
+
+        assertFalse(result);
+    }
 }

Reply via email to