This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch feat/airavata-service-layer in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 18b91c6a86cac50bdeb83ace821de376c876dfc5 Author: yasithdev <[email protected]> AuthorDate: Thu Mar 26 12:03:27 2026 -0500 refactor: replace handler methods with ThriftAdapter one-liners and delete orphaned helpers - isUserExists, fetchIntermediateOutputs, getIntermediateOutputProcessStatus, and launchExperiment now delegate to service layer - Delete orphaned private helpers: validateString, submitExperiment, submitCancelExperiment, submitExperimentIntermediateOutputsEvent, shareEntityWithAdminGatewayGroups, userHasAccessInternal, getResourceType, createManageSharingPermissionTypeIfMissing, retrieveGatewayGroups --- .../api/server/handler/AiravataServerHandler.java | 391 +-------------------- 1 file changed, 10 insertions(+), 381 deletions(-) diff --git a/airavata-api/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java index 64893c4d37..56a318583f 100644 --- a/airavata-api/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java +++ b/airavata-api/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java @@ -175,7 +175,7 @@ public class AiravataServerHandler implements Airavata.Iface { this.dataProductService = new DataProductService(registryHandler); this.groupResourceProfileService = new GroupResourceProfileService(registryHandler, sharingHandler); this.parserService = new ParserService(registryHandler); - this.resourceSharingService = new ResourceSharingService(sharingHandler); + this.resourceSharingService = new ResourceSharingService(sharingHandler, registryHandler); } public AiravataServerHandler() throws Exception { @@ -337,17 +337,7 @@ public class AiravataServerHandler implements Airavata.Iface { public boolean isUserExists(AuthzToken authzToken, String gatewayId, String userName) throws InvalidRequestException, AiravataClientException, AiravataSystemException, AuthorizationException, TException { - try { - boolean isExists = registryHandler.isUserExists(gatewayId, userName); - logger.debug("Checking if the user" + userName + "exists in the gateway" + gatewayId); - return isExists; - } catch (Exception e) { - logger.error("Error while verifying user", e); - AiravataSystemException exception = new AiravataSystemException(); - exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR); - exception.setMessage("Error while verifying user. More info : " + e.getMessage()); - throw exception; - } + return ThriftAdapter.execute(authzToken, gatewayId, ctx -> gatewayService.isUserExists(ctx, gatewayId, userName)); } @Override @@ -546,14 +536,6 @@ public class AiravataServerHandler implements Airavata.Iface { ctx -> projectService.deleteProject(ctx, projectId)); } - private boolean validateString(String name) { - boolean valid = true; - if (name == null || name.equals("") || name.trim().length() == 0) { - valid = false; - } - return valid; - } - /** * Get a Project by ID * @@ -975,69 +957,8 @@ public class AiravataServerHandler implements Airavata.Iface { public void fetchIntermediateOutputs(AuthzToken authzToken, String airavataExperimentId, List<String> outputNames) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, AuthorizationException, TException { - - try { - - // Verify that user has WRITE access to experiment - final boolean hasAccess = - userHasAccessInternal(authzToken, airavataExperimentId, ResourcePermissionType.WRITE); - if (!hasAccess) { - throw new AuthorizationException("User does not have WRITE access to this experiment"); - } - - // Verify that the experiment's job is currently ACTIVE - ExperimentModel existingExperiment = registryHandler.getExperiment(airavataExperimentId); - List<JobModel> jobs = registryHandler.getJobDetails(airavataExperimentId); - boolean anyJobIsActive = jobs.stream().anyMatch(j -> { - if (j.getJobStatusesSize() > 0) { - return j.getJobStatuses().get(j.getJobStatusesSize() - 1).getJobState() == JobState.ACTIVE; - } else { - return false; - } - }); - if (!anyJobIsActive) { - throw new InvalidRequestException("Experiment does not have currently ACTIVE job"); - } - - // Figure out if there are any currently running intermediate output fetching processes for outputNames - // First, find any existing intermediate output fetch processes for outputNames - List<ProcessModel> intermediateOutputFetchProcesses = existingExperiment.getProcesses().stream() - .filter(p -> { - // Filter out completed or failed processes - if (p.getProcessStatusesSize() > 0) { - ProcessStatus latestStatus = p.getProcessStatuses().get(p.getProcessStatusesSize() - 1); - if (latestStatus.getState() == ProcessState.COMPLETED - || latestStatus.getState() == ProcessState.FAILED) { - return false; - } - } - return true; - }) - .filter(p -> p.getTasks().stream().allMatch(t -> t.getTaskType() == TaskTypes.OUTPUT_FETCHING)) - .filter(p -> p.getProcessOutputs().stream().anyMatch(o -> outputNames.contains(o.getName()))) - .collect(Collectors.toList()); - if (!intermediateOutputFetchProcesses.isEmpty()) { - throw new InvalidRequestException( - "There are already intermediate output fetching tasks running for those outputs."); - } - - String gatewayId = authzToken.getClaimsMap().get(Constants.GATEWAY_ID); - submitExperimentIntermediateOutputsEvent(gatewayId, airavataExperimentId, outputNames); - } catch (InvalidRequestException | AuthorizationException e) { - logger.error(e.getMessage(), e); - throw e; - } catch (Exception e) { - logger.error( - "Error while processing request to fetch intermediate outputs for experiment: " - + airavataExperimentId, - e); - AiravataSystemException exception = new AiravataSystemException(); - exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR); - exception.setMessage( - "Error while processing request to fetch intermediate outputs for experiment. More info : " - + e.getMessage()); - throw exception; - } + ThriftAdapter.executeVoid(authzToken, null, + ctx -> experimentService.fetchIntermediateOutputs(ctx, airavataExperimentId, outputNames)); } @Override @@ -1046,62 +967,8 @@ public class AiravataServerHandler implements Airavata.Iface { AuthzToken authzToken, String airavataExperimentId, List<String> outputNames) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, AuthorizationException, TException { - try { - - // Verify that user has READ access to experiment - final boolean hasAccess = - userHasAccessInternal(authzToken, airavataExperimentId, ResourcePermissionType.READ); - if (!hasAccess) { - throw new AuthorizationException("User does not have WRITE access to this experiment"); - } - - ExperimentModel existingExperiment = registryHandler.getExperiment(airavataExperimentId); - - // Find the most recent intermediate output fetching process for the outputNames - // Assumption: only one of these output fetching processes runs at a - // time so we only need to check the status of the most recent one - Optional<ProcessModel> mostRecentOutputFetchProcess = existingExperiment.getProcesses().stream() - .filter(p -> p.getTasks().stream().allMatch(t -> t.getTaskType() == TaskTypes.OUTPUT_FETCHING)) - .filter(p -> { - List<String> names = p.getProcessOutputs().stream() - .map(o -> o.getName()) - .collect(Collectors.toList()); - return new HashSet<>(names).equals(new HashSet<>(outputNames)); - }) - .sorted(Comparator.comparing(ProcessModel::getLastUpdateTime) - .reversed()) - .findFirst(); - - if (!mostRecentOutputFetchProcess.isPresent()) { - throw new InvalidRequestException("No matching intermediate output fetching process found."); - } - - ProcessStatus result; - // Determine the most recent status for the most recent process - ProcessModel process = mostRecentOutputFetchProcess.get(); - if (process.getProcessStatusesSize() > 0) { - result = process.getProcessStatuses().get(process.getProcessStatusesSize() - 1); - } else { - // Process has no statuses so it must be created but not yet running - result = new ProcessStatus(ProcessState.CREATED); - } - - return result; - } catch (InvalidRequestException | AuthorizationException e) { - logger.debug(e.getMessage(), e); - throw e; - } catch (Exception e) { - logger.error( - "Error while processing request to fetch intermediate outputs for experiment: " - + airavataExperimentId, - e); - AiravataSystemException exception = new AiravataSystemException(); - exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR); - exception.setMessage( - "Error while processing request to fetch intermediate outputs for experiment. More info : " - + e.getMessage()); - throw exception; - } + return ThriftAdapter.execute(authzToken, null, + ctx -> experimentService.getIntermediateOutputProcessStatus(ctx, airavataExperimentId, outputNames)); } @SecurityCheck @@ -1151,124 +1018,11 @@ public class AiravataServerHandler implements Airavata.Iface { @SecurityCheck public void launchExperiment(AuthzToken authzToken, final String airavataExperimentId, String gatewayId) throws AuthorizationException, AiravataSystemException, TException { - // TODO: verify that gatewayId matches gatewayId in authzToken logger.info("Launching experiment {}", airavataExperimentId); - try { - ExperimentModel experiment = registryHandler.getExperiment(airavataExperimentId); - - if (experiment == null) { - logger.error( - airavataExperimentId, - "Error while launching experiment, experiment {} doesn't exist.", - airavataExperimentId); - throw new ExperimentNotFoundException( - "Requested experiment id " + airavataExperimentId + " does not exist in the system.."); - } - String username = authzToken.getClaimsMap().get(Constants.USER_NAME); - - // For backwards compatibility, if there is no groupResourceProfileId, look up one that is shared with the - // user - if (!experiment.getUserConfigurationData().isSetGroupResourceProfileId()) { - List<GroupResourceProfile> groupResourceProfiles = getGroupResourceList(authzToken, gatewayId); - logger.info("Checking for groupResourceProfileId for ExpID: " + airavataExperimentId); - if (!groupResourceProfiles.isEmpty()) { - // Just pick the first one - final String groupResourceProfileId = - groupResourceProfiles.get(0).getGroupResourceProfileId(); - logger.warn( - "Experiment {} doesn't have groupResourceProfileId, picking first one user has access to: {}", - airavataExperimentId, - groupResourceProfileId); - experiment.getUserConfigurationData().setGroupResourceProfileId(groupResourceProfileId); - registryHandler.updateExperimentConfiguration( - airavataExperimentId, experiment.getUserConfigurationData()); - } else { - throw new AuthorizationException("User " + username + " in gateway " + gatewayId - + " doesn't have access to any group resource profiles."); - } - } - - // Verify user has READ access to groupResourceProfileId - if (!sharingHandler.userHasAccess( - gatewayId, - username + "@" + gatewayId, - experiment.getUserConfigurationData().getGroupResourceProfileId(), - gatewayId + ":READ")) { - throw new AuthorizationException("User " + username + " in gateway " + gatewayId - + " doesn't have access to group resource profile " - + experiment.getUserConfigurationData().getGroupResourceProfileId()); - } - - // Verify user has READ access to Application Deployment - final String appInterfaceId = experiment.getExecutionId(); - ApplicationInterfaceDescription applicationInterfaceDescription = - registryHandler.getApplicationInterface(appInterfaceId); - - List<String> appModuleIds = applicationInterfaceDescription.getApplicationModules(); - // Assume that there is only one app module for this interface (otherwise, how could we figure out the - // deployment) - String appModuleId = appModuleIds.get(0); - List<ApplicationDeploymentDescription> applicationDeploymentDescriptions = - registryHandler.getApplicationDeployments(appModuleId); - - if (!experiment.getUserConfigurationData().isAiravataAutoSchedule()) { - final String resourceHostId = experiment - .getUserConfigurationData() - .getComputationalResourceScheduling() - .getResourceHostId(); - - Optional<ApplicationDeploymentDescription> applicationDeploymentDescription = - applicationDeploymentDescriptions.stream() - .filter(dep -> dep.getComputeHostId().equals(resourceHostId)) - .findFirst(); - if (applicationDeploymentDescription.isPresent()) { - final String appDeploymentId = - applicationDeploymentDescription.get().getAppDeploymentId(); - if (!sharingHandler.userHasAccess( - gatewayId, username + "@" + gatewayId, appDeploymentId, gatewayId + ":READ")) { - throw new AuthorizationException("User " + username + " in gateway " + gatewayId - + " doesn't have access to app deployment " + appDeploymentId); - } - } else { - throw new InvalidRequestException("Application deployment doesn't exist for application interface " - + appInterfaceId + " and host " + resourceHostId + " in gateway " + gatewayId); - } - } else if (experiment.getUserConfigurationData().getAutoScheduledCompResourceSchedulingList() != null - && !experiment - .getUserConfigurationData() - .getAutoScheduledCompResourceSchedulingList() - .isEmpty()) { - List<ComputationalResourceSchedulingModel> compResourceSchedulingList = - experiment.getUserConfigurationData().getAutoScheduledCompResourceSchedulingList(); - for (ComputationalResourceSchedulingModel crScheduling : compResourceSchedulingList) { - Optional<ApplicationDeploymentDescription> applicationDeploymentDescription = - applicationDeploymentDescriptions.stream() - .filter(dep -> dep.getComputeHostId().equals(crScheduling.getResourceHostId())) - .findFirst(); - if (applicationDeploymentDescription.isPresent()) { - final String appDeploymentId = - applicationDeploymentDescription.get().getAppDeploymentId(); - if (!sharingHandler.userHasAccess( - gatewayId, username + "@" + gatewayId, appDeploymentId, gatewayId + ":READ")) { - throw new AuthorizationException("User " + username + " in gateway " + gatewayId - + " doesn't have access to app deployment " + appDeploymentId); - } - } - } - } - submitExperiment(gatewayId, airavataExperimentId); - logger.info("Experiment with ExpId: " + airavataExperimentId + " was submitted in gateway with gatewayID: " - + gatewayId); - } catch (InvalidRequestException | ExperimentNotFoundException | AuthorizationException e) { - logger.error(e.getMessage(), e); - throw e; - } catch (Exception e1) { - logger.error(airavataExperimentId, "Error while instantiate the registry instance", e1); - AiravataSystemException exception = new AiravataSystemException(); - exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR); - exception.setMessage("Error while instantiate the registry instance. More info : " + e1.getMessage()); - throw exception; - } + ThriftAdapter.executeVoid(authzToken, gatewayId, ctx -> { + List<GroupResourceProfile> groupResourceProfiles = groupResourceProfileService.getGroupResourceList(ctx, gatewayId); + experimentService.launchExperiment(ctx, airavataExperimentId, gatewayId, groupResourceProfiles); + }); } // private OrchestratorService.Client getOrchestratorClient() throws TException { @@ -3525,131 +3279,6 @@ public class AiravataServerHandler implements Airavata.Iface { return ThriftAdapter.execute(authzToken, gatewayId, ctx -> parserService.listAllParsingTemplates(ctx, gatewayId)); } - private void submitExperiment(String gatewayId, String experimentId) throws AiravataException { - ExperimentSubmitEvent event = new ExperimentSubmitEvent(experimentId, gatewayId); - MessageContext messageContext = new MessageContext( - event, MessageType.EXPERIMENT, "LAUNCH.EXP-" + UUID.randomUUID().toString(), gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - if (experimentPublisher != null) { - experimentPublisher.publish(messageContext); - } - } - - private void submitCancelExperiment(String gatewayId, String experimentId) throws AiravataException { - ExperimentSubmitEvent event = new ExperimentSubmitEvent(experimentId, gatewayId); - MessageContext messageContext = new MessageContext( - event, - MessageType.EXPERIMENT_CANCEL, - "CANCEL.EXP-" + UUID.randomUUID().toString(), - gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - if (experimentPublisher != null) { - experimentPublisher.publish(messageContext); - } - } - - private void submitExperimentIntermediateOutputsEvent( - String gatewayId, String experimentId, List<String> outputNames) throws AiravataException { - - ExperimentIntermediateOutputsEvent event = - new ExperimentIntermediateOutputsEvent(experimentId, gatewayId, outputNames); - MessageContext messageContext = new MessageContext( - event, - MessageType.INTERMEDIATE_OUTPUTS, - "INTERMEDIATE_OUTPUTS.EXP-" + UUID.randomUUID().toString(), - gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - if (experimentPublisher != null) { - experimentPublisher.publish(messageContext); - } - } - - private void shareEntityWithAdminGatewayGroups(Entity entity) throws TException { - final String domainId = entity.getDomainId(); - GatewayGroups gatewayGroups = retrieveGatewayGroups(domainId); - createManageSharingPermissionTypeIfMissing(domainId); - sharingHandler.shareEntityWithGroups( - domainId, - entity.getEntityId(), - Arrays.asList(gatewayGroups.getAdminsGroupId()), - domainId + ":MANAGE_SHARING", - true); - sharingHandler.shareEntityWithGroups( - domainId, - entity.getEntityId(), - Arrays.asList(gatewayGroups.getAdminsGroupId()), - domainId + ":WRITE", - true); - sharingHandler.shareEntityWithGroups( - domainId, - entity.getEntityId(), - Arrays.asList(gatewayGroups.getAdminsGroupId(), gatewayGroups.getReadOnlyAdminsGroupId()), - domainId + ":READ", - true); - } - - private boolean userHasAccessInternal( - AuthzToken authzToken, String entityId, ResourcePermissionType permissionType) { - final String domainId = authzToken.getClaimsMap().get(Constants.GATEWAY_ID); - final String userId = authzToken.getClaimsMap().get(Constants.USER_NAME) + "@" + domainId; - try { - final boolean hasOwnerAccess = sharingHandler.userHasAccess( - domainId, userId, entityId, domainId + ":" + ResourcePermissionType.OWNER); - boolean hasAccess = false; - if (permissionType.equals(ResourcePermissionType.WRITE)) { - hasAccess = hasOwnerAccess - || sharingHandler.userHasAccess( - domainId, userId, entityId, domainId + ":" + ResourcePermissionType.WRITE); - } else if (permissionType.equals(ResourcePermissionType.READ)) { - hasAccess = hasOwnerAccess - || sharingHandler.userHasAccess( - domainId, userId, entityId, domainId + ":" + ResourcePermissionType.READ); - } else if (permissionType.equals(ResourcePermissionType.MANAGE_SHARING)) { - hasAccess = hasOwnerAccess - || sharingHandler.userHasAccess( - domainId, userId, entityId, domainId + ":" + ResourcePermissionType.MANAGE_SHARING); - } else if (permissionType.equals(ResourcePermissionType.OWNER)) { - hasAccess = hasOwnerAccess; - } - return hasAccess; - } catch (Exception e) { - throw new RuntimeException("Unable to check if user has access", e); - } - } - - private ResourceType getResourceType(String domainId, String entityId) throws TException { - Entity entity = sharingHandler.getEntity(domainId, entityId); - for (ResourceType resourceType : ResourceType.values()) { - if (entity.getEntityTypeId().equals(domainId + ":" + resourceType.name())) { - return resourceType; - } - } - throw new RuntimeException("Unrecognized entity type id: " + entity.getEntityTypeId()); - } - - private void createManageSharingPermissionTypeIfMissing(String domainId) throws TException { - // AIRAVATA-3297 Some gateways were created without the MANAGE_SHARING permission, so add it if missing - String permissionTypeId = domainId + ":MANAGE_SHARING"; - if (!sharingHandler.isPermissionExists(domainId, permissionTypeId)) { - PermissionType permissionType = new PermissionType(); - permissionType.setPermissionTypeId(permissionTypeId); - permissionType.setDomainId(domainId); - permissionType.setName("MANAGE_SHARING"); - permissionType.setDescription("Manage sharing permission type"); - sharingHandler.createPermissionType(permissionType); - logger.info("Created MANAGE_SHARING permission type for domain " + domainId); - } - } - - private GatewayGroups retrieveGatewayGroups(String gatewayId) throws TException { - - if (registryHandler.isGatewayGroupsExists(gatewayId)) { - return registryHandler.getGatewayGroups(gatewayId); - } else { - return GatewayGroupsInitializer.initializeGatewayGroups(gatewayId); - } - } - /** * To hold storage info context (login username, credential token, and adaptor) */
