This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch worktree-cleanup+lean-core in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 165d2bbbad687a1d49e21c2e14dd57505eeaa46f Author: yasithdev <[email protected]> AuthorDate: Mon Mar 30 07:21:18 2026 -0400 cleanup: consolidate load-client as Python, merge batch/storage scripts, delete old Java source Co-Authored-By: Claude Sonnet 4.6 <[email protected]> --- dev-tools/batch_launch_experiments.py | 479 ------------------- dev-tools/create_launch_experiment_with_storage.py | 267 ----------- dev-tools/load-client/README.md | 42 ++ dev-tools/load-client/load-config.yml | 19 + dev-tools/load-client/load_client.py | 526 +++++++++++++++++++++ dev-tools/load-client/pom.xml | 99 ---- dev-tools/load-client/requirements.txt | 3 + .../src/main/assembly/load-client-bin-assembly.xml | 66 --- .../apache/airavata/tools/load/Authenticator.java | 69 --- .../apache/airavata/tools/load/Configuration.java | 250 ---------- .../apache/airavata/tools/load/Configurations.java | 52 -- .../org/apache/airavata/tools/load/LoadClient.java | 180 ------- .../apache/airavata/tools/load/StatusMonitor.java | 149 ------ .../tools/load/StorageResourceManager.java | 128 ----- .../org/apache/airavata/tools/load/UnitLoad.java | 186 -------- .../src/main/resources/bin/load-client.sh | 56 --- .../load-client/src/main/resources/bin/setenv.sh | 146 ------ .../src/main/resources/conf/load-config.yml | 27 -- 18 files changed, 590 insertions(+), 2154 deletions(-) diff --git a/dev-tools/batch_launch_experiments.py b/dev-tools/batch_launch_experiments.py deleted file mode 100644 index 8edaa47a7d..0000000000 --- a/dev-tools/batch_launch_experiments.py +++ /dev/null @@ -1,479 +0,0 @@ -#!/usr/bin/env python3 - -import os -import sys -import time -import logging -from concurrent.futures import ThreadPoolExecutor -from typing import List, Dict, Any, Union -import pydantic -from rich.progress import Progress, BarColumn, TextColumn, SpinnerColumn, TimeElapsedColumn -from rich.console import Console -from rich.live import Live -from rich.panel import Panel -from rich.layout import Layout -from collections import deque - -os.environ['AUTH_SERVER_URL'] = "https://auth.dev.cybershuttle.org" -os.environ['API_SERVER_HOSTNAME'] = "api.dev.cybershuttle.org" -os.environ['GATEWAY_URL'] = "https://gateway.dev.cybershuttle.org" -os.environ['STORAGE_RESOURCE_HOST'] = "gateway.dev.cybershuttle.org" - -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from create_launch_experiment_with_storage import create_and_launch_experiment -from airavata_experiments.airavata import AiravataOperator -from airavata.model.status.ttypes import ExperimentState - - -class ExperimentLaunchResult(pydantic.BaseModel): - """Result from creating and launching an experiment.""" - experiment_id: str - process_id: str - experiment_dir: str - storage_host: str - mount_point: str - - -class JobConfig(pydantic.BaseModel): - """Configuration for a batch job submission.""" - experiment_name: str - project_name: str - application_name: str - computation_resource_name: str - queue_name: str - node_count: int - cpu_count: int - walltime: int - group_name: str = "Default" - input_storage_host: str | None = None - output_storage_host: str | None = None - input_files: Dict[str, Union[str, List[str]]] | None = None - data_inputs: Dict[str, Union[str, int, float]] | None = None - gateway_id: str | None = None - auto_schedule: bool = False - - -class JobResult(pydantic.BaseModel): - """Result from submitting and monitoring a single job.""" - job_index: int - experiment_id: str | None - status: str - result: ExperimentLaunchResult | None = None - success: bool - error: str | None = None - - -def get_experiment_state_value(status) -> tuple[int, str, ExperimentState]: - """Extract state value, name, and enum from status. Returns (value, name, enum).""" - if isinstance(status, ExperimentState): - return status.value, status.name, status - - # Handle ExperimentStatus object - if hasattr(status, 'state'): - state = status.state - if isinstance(state, ExperimentState): - return state.value, state.name, state - elif hasattr(state, 'value'): - return state.value, state.name if hasattr(state, 'name') else str(state), state - - # Handle direct value/name access - status_value = status.value if hasattr(status, 'value') else (status if isinstance(status, int) else None) - status_name = status.name if hasattr(status, 'name') else str(status) - - # Convert to ExperimentState enum - if status_value is not None: - try: - enum_state = ExperimentState(status_value) - return status_value, status_name, enum_state - except (ValueError, TypeError): - pass - - # Fallback - return None, status_name, ExperimentState.FAILED - - -def monitor_experiment_silent(operator: AiravataOperator, experiment_id: str, check_interval: int = 30) -> ExperimentState: - """Monitor experiment silently until completion. Returns final status.""" - logger = logging.getLogger(__name__) - max_checks = 3600 # Maximum number of checks (about 5 hours at 5s interval) - check_count = 0 - - # Use shorter interval initially, then increase - initial_interval = min(check_interval, 5) # Check every 5 seconds initially - - while check_count < max_checks: - try: - status = operator.get_experiment_status(experiment_id) - - # Extract state information - status_value, status_name, status_enum = get_experiment_state_value(status) - - # Log status periodically for debugging - if check_count % 12 == 0: # Log every minute (12 * 5s) - logger.debug(f"Experiment {experiment_id} status check {check_count}: value={status_value}, name={status_name}") - - # Check terminal states: COMPLETED (7), CANCELED (6), FAILED (8) - if status_value is not None: - is_terminal = status_value in [ - ExperimentState.COMPLETED.value, # 7 - ExperimentState.CANCELED.value, # 6 - ExperimentState.FAILED.value # 8 - ] - else: - is_terminal = status_name in ['COMPLETED', 'CANCELED', 'FAILED'] - - if is_terminal: - logger.info(f"Experiment {experiment_id} reached terminal state: {status_name} (value: {status_value})") - return status_enum - - except Exception as e: - # If we can't get status, log but continue monitoring - logger.warning(f"Error checking experiment {experiment_id} status (check {check_count}): {e}") - import traceback - logger.debug(traceback.format_exc()) - if check_count > 10: # After several failed checks, assume failed - logger.error(f"Multiple status check failures for {experiment_id}, assuming FAILED") - return ExperimentState.FAILED - - # Sleep before next check - sleep_time = initial_interval if check_count < 6 else check_interval - time.sleep(sleep_time) - check_count += 1 - - # If we've exceeded max checks, assume failed - logger.error(f"Experiment {experiment_id} monitoring timeout after {check_count} checks, assuming FAILED") - return ExperimentState.FAILED - - -def submit_and_monitor_job( - job_index: int, - job_config: JobConfig | Dict[str, Any], - access_token: str, -) -> JobResult: - """Submit and monitor a single job. Returns job result with status.""" - # Convert dict to JobConfig if needed - if isinstance(job_config, dict): - job_config = JobConfig(**job_config) - - try: - # Make experiment name unique for each job to avoid directory conflicts - # Using job_index ensures uniqueness and makes it easy to track - unique_experiment_name = f"{job_config.experiment_name}-job{job_index}" - - # Handle input_files and data_inputs same way as working version - input_files = job_config.input_files if job_config.input_files else None - data_inputs = job_config.data_inputs if job_config.data_inputs else None - - result_dict = create_and_launch_experiment( - access_token=access_token, - experiment_name=unique_experiment_name, - project_name=job_config.project_name, - application_name=job_config.application_name, - computation_resource_name=job_config.computation_resource_name, - queue_name=job_config.queue_name, - node_count=job_config.node_count, - cpu_count=job_config.cpu_count, - walltime=job_config.walltime, - group_name=job_config.group_name, - input_storage_host=job_config.input_storage_host, - output_storage_host=job_config.output_storage_host, - input_files=input_files, - data_inputs=data_inputs, - gateway_id=job_config.gateway_id, - auto_schedule=job_config.auto_schedule, - monitor=False, - ) - - operator = AiravataOperator(access_token=access_token) - experiment_id = result_dict['experiment_id'] - - # Check status immediately after submission to catch early failures - try: - initial_status = operator.get_experiment_status(experiment_id) - status_value, status_name, status_enum = get_experiment_state_value(initial_status) - - # Check if already in terminal state - if status_value is not None and status_value in [ - ExperimentState.COMPLETED.value, - ExperimentState.CANCELED.value, - ExperimentState.FAILED.value - ]: - # Already in terminal state - final_status = status_enum - else: - # Monitor until completion - final_status = monitor_experiment_silent(operator, experiment_id) - except Exception as e: - # If we can't check status, log and assume failed - logger = logging.getLogger(__name__) - logger.error(f"Error monitoring experiment {experiment_id}: {e}") - import traceback - logger.debug(traceback.format_exc()) - final_status = ExperimentState.FAILED - - result = ExperimentLaunchResult(**result_dict) - - return JobResult( - job_index=job_index, - experiment_id=result.experiment_id, - status=final_status.name, - result=result, - success=final_status == ExperimentState.COMPLETED, - ) - except Exception as e: - # Log the error for debugging - import traceback - error_msg = f"{str(e)}\n{traceback.format_exc()}" - logger = logging.getLogger(__name__) - logger.error(f"Job {job_index} failed: {error_msg}") - - return JobResult( - job_index=job_index, - experiment_id=None, - status='ERROR', - result=None, - success=False, - error=str(e), - ) - - -def batch_submit_jobs( - job_config: JobConfig | Dict[str, Any], - num_copies: int = 10, - max_concurrent: int = 5, - access_token: str | None = None, -) -> List[JobResult]: - """Submit multiple job copies in batches with progress bar.""" - if access_token is None: - from airavata_auth.device_auth import AuthContext - access_token = AuthContext.get_access_token() - - console = Console() - results = [] - log_buffer = deque(maxlen=50) # Keep last 50 log lines for display - - # Custom handler to capture logs to buffer - class ListHandler(logging.Handler): - def __init__(self, buffer): - super().__init__() - self.buffer = buffer - - def emit(self, record): - msg = self.format(record) - self.buffer.append(msg) - - log_handler = ListHandler(log_buffer) - log_handler.setLevel(logging.INFO) - log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) - - # Add to root logger and module logger - logging.root.addHandler(log_handler) - logger = logging.getLogger('create_launch_experiment_with_storage') - logger.addHandler(log_handler) - - # Configure progress bar - progress = Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), - TextColumn("•"), - TextColumn("{task.completed}/{task.total}"), - TimeElapsedColumn(), - console=console, - ) - - task = progress.add_task( - f"{num_copies} total, 0 running, 0 completed, 0 failed", - total=num_copies - ) - - # Create layout with logs above and progress below - layout = Layout() - layout.split_column( - Layout(name="logs", size=None), - Layout(progress, name="progress", size=3) - ) - - def make_display(): - # Get logs from buffer - always show the latest logs (they're added to end of deque) - log_lines = list(log_buffer) if log_buffer else ["No logs yet..."] - # Show last 20 lines to keep display manageable and scrolled to bottom - display_lines = log_lines[-20:] if len(log_lines) > 20 else log_lines - log_text = '\n'.join(display_lines) - log_panel = Panel( - log_text, - title="Logs (latest)", - border_style="blue", - height=None, - expand=False - ) - layout["logs"].update(log_panel) - return layout - - try: - # Use Live to keep layout fixed, progress at bottom - with Live(make_display(), console=console, refresh_per_second=4, screen=True) as live: - with ThreadPoolExecutor(max_workers=max_concurrent) as executor: - active_futures = {} - next_job_index = 0 - - # Submit initial batch - while next_job_index < min(max_concurrent, num_copies): - future = executor.submit(submit_and_monitor_job, next_job_index, job_config, access_token) - active_futures[future] = next_job_index - next_job_index += 1 - - # Process completed jobs and submit new ones - # Continue until all jobs are submitted AND all active futures are done - while active_futures or next_job_index < num_copies: - completed_futures = [f for f in active_futures if f.done()] - - for future in completed_futures: - job_idx = active_futures.pop(future) - - try: - result = future.result() - results.append(result) - except Exception as e: - # Handle unexpected exceptions - results.append(JobResult( - job_index=job_idx, - experiment_id=None, - status='ERROR', - result=None, - success=False, - error=str(e), - )) - - # Submit next jobs if available and we have capacity - while next_job_index < num_copies and len(active_futures) < max_concurrent: - try: - new_future = executor.submit(submit_and_monitor_job, next_job_index, job_config, access_token) - active_futures[new_future] = next_job_index - next_job_index += 1 - except Exception as e: - # If submission itself fails, mark as error and continue - results.append(JobResult( - job_index=next_job_index, - experiment_id=None, - status='ERROR', - result=None, - success=False, - error=f"Submission failed: {str(e)}", - )) - next_job_index += 1 - - # Update progress bar with counts - completed_count = len(results) - running_count = len(active_futures) - submitted_count = next_job_index - successful_count = sum(1 for r in results if r.success) - failed_count = completed_count - successful_count - - # Show submitted count if not all jobs submitted yet - if submitted_count < num_copies: - status_desc = f"{num_copies} total, {submitted_count} submitted, {running_count} running, {completed_count} completed, {failed_count} failed" - else: - status_desc = f"{num_copies} total, {running_count} running, {completed_count} completed, {failed_count} failed" - - progress.update( - task, - completed=completed_count, - description=status_desc - ) - live.update(make_display()) - - if not completed_futures and next_job_index >= num_copies: - # Only sleep if nothing changed - time.sleep(1) - - # Sort results by job_index - results.sort(key=lambda x: x.job_index) - return results - finally: - # Clean up log handlers - logging.root.removeHandler(log_handler) - if log_handler in logger.handlers: - logger.removeHandler(log_handler) - - -def main(): - """Main function that sets up job configuration and runs batch submission.""" - from airavata_auth.device_auth import AuthContext - - access_token = AuthContext.get_access_token() - - # Job configuration - matching create_launch_experiment_with_storage.py exactly - EXPERIMENT_NAME = "Test" - PROJECT_NAME = "Default Project" - APPLICATION_NAME = "NAMD-test" - GATEWAY_ID = None - COMPUTATION_RESOURCE_NAME = "NeuroData25VC2" - QUEUE_NAME = "cloud" - NODE_COUNT = 1 - CPU_COUNT = 1 - WALLTIME = 5 - GROUP_NAME = "Default" - INPUT_STORAGE_HOST = "gateway.dev.cybershuttle.org" - OUTPUT_STORAGE_HOST = "149.165.169.12" - INPUT_FILES = {} - DATA_INPUTS = {} - AUTO_SCHEDULE = False - - job_config = JobConfig( - experiment_name=EXPERIMENT_NAME, - project_name=PROJECT_NAME, - application_name=APPLICATION_NAME, - computation_resource_name=COMPUTATION_RESOURCE_NAME, - queue_name=QUEUE_NAME, - node_count=NODE_COUNT, - cpu_count=CPU_COUNT, - walltime=WALLTIME, - group_name=GROUP_NAME, - input_storage_host=INPUT_STORAGE_HOST, - output_storage_host=OUTPUT_STORAGE_HOST, - input_files=INPUT_FILES if INPUT_FILES else None, - data_inputs=DATA_INPUTS if DATA_INPUTS else None, - gateway_id=GATEWAY_ID, - auto_schedule=AUTO_SCHEDULE, - ) - - num_copies = 10 - - try: - results = batch_submit_jobs( - job_config=job_config, - num_copies=num_copies, - max_concurrent=5, - access_token=access_token, - ) - - # Print summary - print("\n" + "="*60) - print(f"Batch submission complete: {num_copies} jobs") - print("="*60) - successful = sum(1 for r in results if r.success) - print(f"Successful: {successful}/{num_copies}") - print(f"Failed: {num_copies - successful}/{num_copies}") - print("\nJob Results:") - for result in results: - status_symbol = "✓" if result.success else "✗" - exp_id = result.experiment_id or 'N/A' - print(f" {status_symbol} Job {result.job_index}: {result.status} " - f"(ID: {exp_id})") - print("="*60) - - return results - - except Exception as e: - print(f"Failed to run batch submission: {repr(e)}", file=sys.stderr) - import traceback - traceback.print_exc() - sys.exit(1) - - -if __name__ == "__main__": - main() - diff --git a/dev-tools/create_launch_experiment_with_storage.py b/dev-tools/create_launch_experiment_with_storage.py deleted file mode 100755 index 7d333e1234..0000000000 --- a/dev-tools/create_launch_experiment_with_storage.py +++ /dev/null @@ -1,267 +0,0 @@ -#!/usr/bin/env python3 - -import os -import sys -import time -import logging -from pathlib import Path - -os.environ['AUTH_SERVER_URL'] = "https://auth.dev.cybershuttle.org" -os.environ['API_SERVER_HOSTNAME'] = "api.dev.cybershuttle.org" -os.environ['GATEWAY_URL'] = "https://gateway.dev.cybershuttle.org" -os.environ['STORAGE_RESOURCE_HOST'] = "gateway.dev.cybershuttle.org" - -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from airavata_experiments.airavata import AiravataOperator -from airavata.model.status.ttypes import ExperimentState -from airavata_auth.device_auth import AuthContext - -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - - -def list_storage_resources(access_token: str, gateway_id: str | None = None): - operator = AiravataOperator(access_token=access_token) - sr_names = operator.api_server_client.get_all_storage_resource_names(operator.airavata_token) - logger.info("Available storage resources:") - for sr_id, hostname in sr_names.items(): - logger.info(f" ID: {sr_id}, Hostname: {hostname}") - return sr_names - - -def get_storage_hostname_by_id(access_token: str, storage_resource_id: str) -> str | None: - operator = AiravataOperator(access_token=access_token) - sr_names = operator.api_server_client.get_all_storage_resource_names(operator.airavata_token) - hostname = sr_names.get(storage_resource_id) - if hostname: - logger.info(f"Storage ID {storage_resource_id} maps to hostname: {hostname}") - else: - logger.warning(f"Storage ID {storage_resource_id} not found in available resources") - return hostname - - -def create_and_launch_experiment( - access_token: str, - experiment_name: str, - project_name: str, - application_name: str, - computation_resource_name: str, - queue_name: str, - node_count: int, - cpu_count: int, - walltime: int, - group_name: str = "Default", - input_storage_host: str | None = None, - output_storage_host: str | None = None, - input_files: dict[str, str | list[str]] | None = None, - data_inputs: dict[str, str | int | float] | None = None, - gateway_id: str | None = None, - auto_schedule: bool = False, - monitor: bool = True, -) -> dict: - operator = AiravataOperator(access_token=access_token) - - experiment_inputs = {} - - if input_files: - for input_name, file_paths in input_files.items(): - if isinstance(file_paths, list): - experiment_inputs[input_name] = { - "type": "uri[]", - "value": [str(Path(fp).resolve()) for fp in file_paths] - } - logger.info(f"Added file array input '{input_name}': {file_paths}") - else: - experiment_inputs[input_name] = { - "type": "uri", - "value": str(Path(file_paths).resolve()) - } - logger.info(f"Added file input '{input_name}': {file_paths}") - - if data_inputs: - for input_name, value in data_inputs.items(): - if isinstance(value, int): - experiment_inputs[input_name] = {"type": "int", "value": value} - elif isinstance(value, float): - experiment_inputs[input_name] = {"type": "float", "value": value} - else: - experiment_inputs[input_name] = {"type": "string", "value": str(value)} - logger.info(f"Added data input '{input_name}': {value}") - - if not experiment_inputs: - logger.info("No inputs provided. Adding dummy input for applications that don't require inputs...") - experiment_inputs = {"__no_inputs__": {"type": "string", "value": ""}} - - logger.info(f"Launching experiment '{experiment_name}'...") - logger.info(f" Project: {project_name}") - logger.info(f" Application: {application_name}") - logger.info(f" Compute Resource: {computation_resource_name}") - logger.info(f" Input Storage: {input_storage_host or 'default'}") - logger.info(f" Output Storage: {output_storage_host or input_storage_host or 'default'}") - - launch_state = operator.launch_experiment( - experiment_name=experiment_name, - project=project_name, - app_name=application_name, - inputs=experiment_inputs, - computation_resource_name=computation_resource_name, - queue_name=queue_name, - node_count=node_count, - cpu_count=cpu_count, - walltime=walltime, - group=group_name, - gateway_id=gateway_id, - input_sr_host=input_storage_host, - output_sr_host=output_storage_host, - auto_schedule=auto_schedule, - ) - - logger.info(f"Experiment launched successfully!") - logger.info(f" Experiment ID: {launch_state.experiment_id}") - logger.info(f" Process ID: {launch_state.process_id}") - logger.info(f" Experiment Directory: {launch_state.experiment_dir}") - logger.info(f" Storage Host: {launch_state.sr_host}") - - result = { - "experiment_id": launch_state.experiment_id, - "process_id": launch_state.process_id, - "experiment_dir": launch_state.experiment_dir, - "storage_host": launch_state.sr_host, - "mount_point": str(launch_state.mount_point), - } - - if monitor: - logger.info("Monitoring experiment status...") - monitor_experiment(operator, launch_state.experiment_id) - - return result - - -def get_experiment_state_value(status) -> tuple[int, str, ExperimentState]: - """Extract state value, name, and enum from status. Returns (value, name, enum).""" - if isinstance(status, ExperimentState): - return status.value, status.name, status - - # Handle ExperimentStatus object - if hasattr(status, 'state'): - state = status.state - if isinstance(state, ExperimentState): - return state.value, state.name, state - elif hasattr(state, 'value'): - return state.value, state.name if hasattr(state, 'name') else str(state), state - - # Handle direct value/name access - status_value = status.value if hasattr(status, 'value') else (status if isinstance(status, int) else None) - status_name = status.name if hasattr(status, 'name') else str(status) - - # Convert to ExperimentState enum - if status_value is not None: - try: - enum_state = ExperimentState(status_value) - return status_value, status_name, enum_state - except (ValueError, TypeError): - pass - - # Fallback - return None, status_name, ExperimentState.FAILED - - -def monitor_experiment(operator: AiravataOperator, experiment_id: str, check_interval: int = 30): - logger.info(f"Monitoring experiment {experiment_id}...") - - while True: - try: - status = operator.get_experiment_status(experiment_id) - status_value, status_name, status_enum = get_experiment_state_value(status) - logger.info(f"Experiment status: {status_name} (value: {status_value})") - - # Check terminal states: COMPLETED (7), CANCELED (6), FAILED (8) - if status_value is not None: - is_terminal = status_value in [ - ExperimentState.COMPLETED.value, # 7 - ExperimentState.CANCELED.value, # 6 - ExperimentState.FAILED.value # 8 - ] - else: - is_terminal = status_name in ['COMPLETED', 'CANCELED', 'FAILED'] - - if is_terminal: - logger.info(f"Experiment finished with state: {status_name}") - break - except Exception as e: - logger.error(f"Error checking experiment {experiment_id} status: {e}") - import traceback - logger.debug(traceback.format_exc()) - # Continue monitoring despite errors - - time.sleep(check_interval) - - -def main(): - logger.info("Authenticating...") - ACCESS_TOKEN = AuthContext.get_access_token() - - EXPERIMENT_NAME = "Test" - PROJECT_NAME = "Default Project" - APPLICATION_NAME = "NAMD-test" - GATEWAY_ID = None - - COMPUTATION_RESOURCE_NAME = "NeuroData25VC2" - QUEUE_NAME = "cloud" - NODE_COUNT = 1 - CPU_COUNT = 1 - WALLTIME = 5 - GROUP_NAME = "Default" - - INPUT_STORAGE_HOST = "gateway.dev.cybershuttle.org" - OUTPUT_STORAGE_HOST = "149.165.169.12" - - INPUT_FILES = {} - DATA_INPUTS = {} - - AUTO_SCHEDULE = False - MONITOR = True - - try: - result = create_and_launch_experiment( - access_token=ACCESS_TOKEN, - experiment_name=EXPERIMENT_NAME, - project_name=PROJECT_NAME, - application_name=APPLICATION_NAME, - computation_resource_name=COMPUTATION_RESOURCE_NAME, - queue_name=QUEUE_NAME, - node_count=NODE_COUNT, - cpu_count=CPU_COUNT, - walltime=WALLTIME, - group_name=GROUP_NAME, - input_storage_host=INPUT_STORAGE_HOST, - output_storage_host=OUTPUT_STORAGE_HOST, - input_files=INPUT_FILES if INPUT_FILES else None, - data_inputs=DATA_INPUTS if DATA_INPUTS else None, - gateway_id=GATEWAY_ID, - auto_schedule=AUTO_SCHEDULE, - monitor=MONITOR, - ) - - logger.info("\n" + "="*60) - logger.info("Experiment created and launched successfully!") - logger.info("="*60) - logger.info(f"Experiment ID: {result['experiment_id']}") - logger.info(f"Process ID: {result['process_id']}") - logger.info(f"Experiment Directory: {result['experiment_dir']}") - logger.info(f"Storage Host: {result['storage_host']}") - logger.info("="*60) - - return result - - except Exception as e: - logger.error(f"Failed to create/launch experiment: {repr(e)}", exc_info=True) - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/dev-tools/load-client/README.md b/dev-tools/load-client/README.md new file mode 100644 index 0000000000..42477bc471 --- /dev/null +++ b/dev-tools/load-client/README.md @@ -0,0 +1,42 @@ +# Airavata Load Client + +Python-based load testing and experiment launching tool for Apache Airavata. + +## Prerequisites + +- Python 3.10+ +- `airavata-python-sdk` installed (from `../../airavata-python-sdk/`) +- Dependencies: `pip install -r requirements.txt` + +## Modes + +### Single Experiment + +```bash +python load_client.py single \ + --experiment-name "MyTest" \ + --project "Default Project" \ + --application "Echo" \ + --resource "localhost" \ + --queue "normal" \ + --walltime 5 +``` + +### Batch (N copies of a scenario) + +```bash +python load_client.py batch \ + --config load-config.yml \ + --scenario "Echo Test" \ + --copies 10 +``` + +### Load Test (all scenarios from config) + +```bash +python load_client.py load --config load-config.yml +``` + +## Configuration + +Edit `load-config.yml` to define scenarios. Each scenario specifies the experiment template, compute resources, inputs, and concurrency parameters. diff --git a/dev-tools/load-client/load-config.yml b/dev-tools/load-client/load-config.yml new file mode 100644 index 0000000000..b0c10bc1e3 --- /dev/null +++ b/dev-tools/load-client/load-config.yml @@ -0,0 +1,19 @@ +# Load testing configuration for Airavata API +# Each scenario defines an experiment template + concurrency parameters + +scenarios: + - name: "Echo Test" + experiment_name: "LoadTest-Echo" + project_name: "Default Project" + application_name: "Echo" + computation_resource_name: "localhost" + queue_name: "normal" + node_count: 1 + cpu_count: 1 + walltime: 5 + input_files: {} + data_inputs: + Input-to-Echo: "Hello from load test" + concurrent_users: 5 + iterations_per_user: 2 + delay_between_submissions_ms: 500 diff --git a/dev-tools/load-client/load_client.py b/dev-tools/load-client/load_client.py new file mode 100644 index 0000000000..057702c94f --- /dev/null +++ b/dev-tools/load-client/load_client.py @@ -0,0 +1,526 @@ +#!/usr/bin/env python3 +""" +Airavata Load Client — unified CLI for experiment launching and load testing. + +Modes: + single — create and launch one experiment + batch — launch N copies of a scenario from config + load — run all scenarios from YAML config (concurrent users, metrics) +""" + +import argparse +import csv +import logging +import os +import sys +import time +from collections import deque +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional, Union + +import yaml +from pydantic import BaseModel +from rich.console import Console +from rich.layout import Layout +from rich.live import Live +from rich.panel import Panel +from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn + +os.environ.setdefault('AUTH_SERVER_URL', "https://auth.dev.cybershuttle.org") +os.environ.setdefault('API_SERVER_HOSTNAME', "api.dev.cybershuttle.org") +os.environ.setdefault('GATEWAY_URL', "https://gateway.dev.cybershuttle.org") +os.environ.setdefault('STORAGE_RESOURCE_HOST', "gateway.dev.cybershuttle.org") + +from airavata_auth.device_auth import AuthContext +from airavata_experiments.airavata import AiravataOperator +from airavata.model.status.ttypes import ExperimentState + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', +) +logger = logging.getLogger(__name__) +console = Console() + + +# --------------------------------------------------------------------------- +# Pydantic models +# --------------------------------------------------------------------------- + +class ScenarioConfig(BaseModel): + name: str + experiment_name: str + project_name: str + application_name: str + computation_resource_name: str + queue_name: str + node_count: int + cpu_count: int + walltime: int + group_name: str = "Default" + input_storage_host: Optional[str] = None + output_storage_host: Optional[str] = None + input_files: Dict[str, Union[str, List[str]]] = {} + data_inputs: Dict[str, Union[str, int, float]] = {} + gateway_id: Optional[str] = None + auto_schedule: bool = False + # load-mode fields + concurrent_users: int = 1 + iterations_per_user: int = 1 + delay_between_submissions_ms: int = 0 + + +class LoadConfig(BaseModel): + scenarios: List[ScenarioConfig] + + +# --------------------------------------------------------------------------- +# Job result dataclass +# --------------------------------------------------------------------------- + +@dataclass +class JobResult: + job_index: int + scenario_name: str + experiment_id: Optional[str] + status: str + success: bool + submit_time: float = 0.0 + finish_time: float = 0.0 + error: Optional[str] = None + + @property + def elapsed_seconds(self) -> float: + if self.finish_time and self.submit_time: + return self.finish_time - self.submit_time + return 0.0 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _get_state_value(status) -> tuple: + """Return (int_value, name_str, ExperimentState) from any status object.""" + if isinstance(status, ExperimentState): + return status.value, status.name, status + if hasattr(status, 'state'): + state = status.state + if isinstance(state, ExperimentState): + return state.value, state.name, state + if hasattr(state, 'value'): + try: + enum = ExperimentState(state.value) + return state.value, state.name if hasattr(state, 'name') else str(state), enum + except (ValueError, TypeError): + pass + val = status.value if hasattr(status, 'value') else None + name = status.name if hasattr(status, 'name') else str(status) + if val is not None: + try: + return val, name, ExperimentState(val) + except (ValueError, TypeError): + pass + return None, name, ExperimentState.FAILED + + +_TERMINAL = frozenset([ + ExperimentState.COMPLETED.value, + ExperimentState.CANCELED.value, + ExperimentState.FAILED.value, +]) + + +def prepare_inputs( + input_files: Dict[str, Union[str, List[str]]], + data_inputs: Dict[str, Union[str, int, float]], +) -> dict: + """Build the experiment_inputs dict expected by AiravataOperator.launch_experiment.""" + inputs: dict = {} + for name, paths in (input_files or {}).items(): + if isinstance(paths, list): + inputs[name] = {"type": "uri[]", "value": [str(Path(p).resolve()) for p in paths]} + else: + inputs[name] = {"type": "uri", "value": str(Path(paths).resolve())} + for name, value in (data_inputs or {}).items(): + if isinstance(value, int): + inputs[name] = {"type": "int", "value": value} + elif isinstance(value, float): + inputs[name] = {"type": "float", "value": value} + else: + inputs[name] = {"type": "string", "value": str(value)} + if not inputs: + inputs = {"__no_inputs__": {"type": "string", "value": ""}} + return inputs + + +def monitor_experiment(operator: AiravataOperator, experiment_id: str, check_interval: int = 30) -> ExperimentState: + """Poll until the experiment reaches a terminal state. Returns final ExperimentState.""" + max_checks = 3600 + count = 0 + initial_interval = min(check_interval, 5) + while count < max_checks: + try: + status = operator.get_experiment_status(experiment_id) + val, name, enum = _get_state_value(status) + is_terminal = (val in _TERMINAL) if val is not None else (name in {'COMPLETED', 'CANCELED', 'FAILED'}) + if is_terminal: + return enum + except Exception as exc: + logger.warning("Status check error for %s (attempt %d): %s", experiment_id, count, exc) + if count > 10: + logger.error("Too many failures monitoring %s; assuming FAILED", experiment_id) + return ExperimentState.FAILED + sleep = initial_interval if count < 6 else check_interval + time.sleep(sleep) + count += 1 + logger.error("Monitoring timeout for %s; assuming FAILED", experiment_id) + return ExperimentState.FAILED + + +# --------------------------------------------------------------------------- +# Core launch function +# --------------------------------------------------------------------------- + +def launch_experiment( + access_token: str, + experiment_name: str, + project_name: str, + application_name: str, + computation_resource_name: str, + queue_name: str, + node_count: int, + cpu_count: int, + walltime: int, + group_name: str = "Default", + input_storage_host: Optional[str] = None, + output_storage_host: Optional[str] = None, + input_files: Optional[Dict] = None, + data_inputs: Optional[Dict] = None, + gateway_id: Optional[str] = None, + auto_schedule: bool = False, + do_monitor: bool = True, +) -> dict: + """Create and launch one experiment; return a result dict.""" + operator = AiravataOperator(access_token=access_token) + inputs = prepare_inputs(input_files or {}, data_inputs or {}) + + launch_state = operator.launch_experiment( + experiment_name=experiment_name, + project=project_name, + app_name=application_name, + inputs=inputs, + computation_resource_name=computation_resource_name, + queue_name=queue_name, + node_count=node_count, + cpu_count=cpu_count, + walltime=walltime, + group=group_name, + gateway_id=gateway_id, + input_sr_host=input_storage_host, + output_sr_host=output_storage_host, + auto_schedule=auto_schedule, + ) + + result = { + "experiment_id": launch_state.experiment_id, + "process_id": launch_state.process_id, + "experiment_dir": launch_state.experiment_dir, + "storage_host": launch_state.sr_host, + "mount_point": str(launch_state.mount_point), + } + + if do_monitor: + monitor_experiment(operator, launch_state.experiment_id) + + return result + + +# --------------------------------------------------------------------------- +# Worker used by batch / load modes +# --------------------------------------------------------------------------- + +def _run_job( + job_index: int, + scenario: ScenarioConfig, + access_token: str, + name_suffix: str = "", +) -> JobResult: + submit_time = time.time() + exp_name = f"{scenario.experiment_name}{name_suffix}" + try: + result = launch_experiment( + access_token=access_token, + experiment_name=exp_name, + project_name=scenario.project_name, + application_name=scenario.application_name, + computation_resource_name=scenario.computation_resource_name, + queue_name=scenario.queue_name, + node_count=scenario.node_count, + cpu_count=scenario.cpu_count, + walltime=scenario.walltime, + group_name=scenario.group_name, + input_storage_host=scenario.input_storage_host, + output_storage_host=scenario.output_storage_host, + input_files=scenario.input_files or None, + data_inputs=scenario.data_inputs or None, + gateway_id=scenario.gateway_id, + auto_schedule=scenario.auto_schedule, + do_monitor=True, + ) + # Determine final state + operator = AiravataOperator(access_token=access_token) + status = operator.get_experiment_status(result["experiment_id"]) + val, name, _ = _get_state_value(status) + success = val == ExperimentState.COMPLETED.value + return JobResult( + job_index=job_index, + scenario_name=scenario.name, + experiment_id=result["experiment_id"], + status=name, + success=success, + submit_time=submit_time, + finish_time=time.time(), + ) + except Exception as exc: + import traceback + return JobResult( + job_index=job_index, + scenario_name=scenario.name, + experiment_id=None, + status="ERROR", + success=False, + submit_time=submit_time, + finish_time=time.time(), + error=f"{exc}\n{traceback.format_exc()}", + ) + + +def _run_with_progress( + jobs: List[tuple], # list of (job_index, scenario, suffix) + access_token: str, + max_workers: int, + description: str, +) -> List[JobResult]: + """Submit jobs with a Rich progress bar. Returns results sorted by job_index.""" + results: List[JobResult] = [] + log_buf: deque = deque(maxlen=50) + + class _BufHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + log_buf.append(self.format(record)) + + handler = _BufHandler() + handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s')) + logging.root.addHandler(handler) + + progress = Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TextColumn("•"), + TextColumn("{task.completed}/{task.total}"), + TimeElapsedColumn(), + console=console, + ) + task_id = progress.add_task(description, total=len(jobs)) + layout = Layout() + layout.split_column(Layout(name="logs"), Layout(progress, name="progress", size=3)) + + def _refresh(): + lines = list(log_buf)[-20:] or ["No logs yet..."] + layout["logs"].update(Panel("\n".join(lines), title="Logs (latest)", border_style="blue")) + return layout + + try: + with Live(_refresh(), console=console, refresh_per_second=4, screen=True) as live: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + pending: dict = {} + queue = list(jobs) + # seed initial batch + while queue and len(pending) < max_workers: + idx, scen, suffix = queue.pop(0) + f = executor.submit(_run_job, idx, scen, access_token, suffix) + pending[f] = idx + + while pending or queue: + done = [f for f in pending if f.done()] + for f in done: + pending.pop(f) + try: + results.append(f.result()) + except Exception as exc: + results.append(JobResult( + job_index=-1, scenario_name="?", + experiment_id=None, status="ERROR", + success=False, error=str(exc), + )) + progress.update(task_id, advance=1) + + while queue and len(pending) < max_workers: + idx, scen, suffix = queue.pop(0) + f = executor.submit(_run_job, idx, scen, access_token, suffix) + pending[f] = idx + + live.update(_refresh()) + if not done and pending: + time.sleep(1) + finally: + logging.root.removeHandler(handler) + + results.sort(key=lambda r: r.job_index) + return results + + +def _write_csv(results: List[JobResult], path: str) -> None: + with open(path, "w", newline="") as fh: + w = csv.writer(fh) + w.writerow(["job_index", "scenario_name", "experiment_id", "status", "success", "elapsed_seconds", "error"]) + for r in results: + w.writerow([r.job_index, r.scenario_name, r.experiment_id, r.status, + r.success, f"{r.elapsed_seconds:.2f}", r.error or ""]) + console.print(f"[green]Metrics written to {path}[/green]") + + +def _print_summary(results: List[JobResult], label: str) -> None: + total = len(results) + ok = sum(1 for r in results if r.success) + console.print(f"\n{'='*60}") + console.print(f"{label}: {total} jobs") + console.print(f" Successful : {ok}/{total}") + console.print(f" Failed : {total - ok}/{total}") + for r in results: + sym = "[green]✓[/green]" if r.success else "[red]✗[/red]" + console.print(f" {sym} [{r.job_index}] {r.scenario_name} — {r.status}" + + (f" ({r.experiment_id})" if r.experiment_id else "")) + console.print('='*60) + + +# --------------------------------------------------------------------------- +# CLI sub-commands +# --------------------------------------------------------------------------- + +def cmd_single(args: argparse.Namespace) -> None: + access_token = AuthContext.get_access_token() + result = launch_experiment( + access_token=access_token, + experiment_name=args.experiment_name, + project_name=args.project, + application_name=args.application, + computation_resource_name=args.resource, + queue_name=args.queue, + node_count=args.node_count, + cpu_count=args.cpu_count, + walltime=args.walltime, + group_name=args.group, + input_storage_host=args.input_storage_host, + output_storage_host=args.output_storage_host, + do_monitor=not args.no_monitor, + ) + console.print("\n[bold green]Experiment launched successfully![/bold green]") + for k, v in result.items(): + console.print(f" {k}: {v}") + + +def cmd_batch(args: argparse.Namespace) -> None: + with open(args.config) as fh: + raw = yaml.safe_load(fh) + cfg = LoadConfig(**raw) + + scenario_map = {s.name: s for s in cfg.scenarios} + if args.scenario not in scenario_map: + console.print(f"[red]Scenario '{args.scenario}' not found in config.[/red]") + sys.exit(1) + scenario = scenario_map[args.scenario] + + access_token = AuthContext.get_access_token() + n = args.copies + jobs = [(i, scenario, f"-job{i}") for i in range(n)] + results = _run_with_progress(jobs, access_token, max_workers=args.max_concurrent, + description=f"Batch: {scenario.name}") + _print_summary(results, f"Batch '{scenario.name}'") + if args.csv: + _write_csv(results, args.csv) + + +def cmd_load(args: argparse.Namespace) -> None: + with open(args.config) as fh: + raw = yaml.safe_load(fh) + cfg = LoadConfig(**raw) + + access_token = AuthContext.get_access_token() + all_jobs: List[tuple] = [] + job_index = 0 + for scenario in cfg.scenarios: + total = scenario.concurrent_users * scenario.iterations_per_user + for i in range(total): + all_jobs.append((job_index, scenario, f"-u{i // scenario.iterations_per_user}-iter{i % scenario.iterations_per_user}")) + job_index += 1 + if scenario.delay_between_submissions_ms > 0 and i < total - 1: + time.sleep(scenario.delay_between_submissions_ms / 1000.0) + + max_workers = max(s.concurrent_users for s in cfg.scenarios) + results = _run_with_progress(all_jobs, access_token, max_workers=max_workers, + description="Load test") + _print_summary(results, "Load test") + if args.csv: + _write_csv(results, args.csv) + + +# --------------------------------------------------------------------------- +# Argument parsing +# --------------------------------------------------------------------------- + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="load_client.py", + description="Airavata experiment launcher and load tester", + ) + sub = parser.add_subparsers(dest="command", required=True) + + # single + p_single = sub.add_parser("single", help="Create and launch one experiment") + p_single.add_argument("--experiment-name", required=True) + p_single.add_argument("--project", required=True) + p_single.add_argument("--application", required=True) + p_single.add_argument("--resource", required=True) + p_single.add_argument("--queue", required=True) + p_single.add_argument("--node-count", type=int, default=1) + p_single.add_argument("--cpu-count", type=int, default=1) + p_single.add_argument("--walltime", type=int, default=30) + p_single.add_argument("--group", default="Default") + p_single.add_argument("--input-storage-host") + p_single.add_argument("--output-storage-host") + p_single.add_argument("--no-monitor", action="store_true", + help="Submit without waiting for completion") + p_single.set_defaults(func=cmd_single) + + # batch + p_batch = sub.add_parser("batch", help="Launch N copies of a scenario") + p_batch.add_argument("--config", required=True, help="Path to load-config.yml") + p_batch.add_argument("--scenario", required=True, help="Scenario name in config") + p_batch.add_argument("--copies", type=int, default=10) + p_batch.add_argument("--max-concurrent", type=int, default=5) + p_batch.add_argument("--csv", metavar="FILE", help="Write metrics CSV to FILE") + p_batch.set_defaults(func=cmd_batch) + + # load + p_load = sub.add_parser("load", help="Run all scenarios from config") + p_load.add_argument("--config", required=True, help="Path to load-config.yml") + p_load.add_argument("--csv", metavar="FILE", help="Write metrics CSV to FILE") + p_load.set_defaults(func=cmd_load) + + return parser + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/dev-tools/load-client/pom.xml b/dev-tools/load-client/pom.xml deleted file mode 100644 index a8655b3113..0000000000 --- a/dev-tools/load-client/pom.xml +++ /dev/null @@ -1,99 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <parent> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata</artifactId> - <version>0.21-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>load-client</artifactId> - <name>Airavata Load Testing Client</name> - <description>Puts a load to Airavata through API</description> - <url>http://airavata.apache.org/</url> - <packaging>jar</packaging> - <version>0.21-SNAPSHOT</version> - - <dependencies> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-api</artifactId> - <version>0.21-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.keycloak</groupId> - <artifactId>keycloak-admin-client</artifactId> - <version>26.0.5</version> - <exclusions> - <exclusion> - <groupId>org.eclipse.angus</groupId> - <artifactId>angus-mail</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.keycloak</groupId> - <artifactId>keycloak-authz-client</artifactId> - <version>26.0.5</version> - </dependency> - <dependency> - <groupId>org.yaml</groupId> - <artifactId>snakeyaml</artifactId> - <version>2.4</version> - </dependency> - <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - <version>1.9.0</version> - </dependency> - <dependency> - <groupId>org.bouncycastle</groupId> - <artifactId>bcpkix-jdk18on</artifactId> - <version>1.81</version> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>load-client-distribution-package</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <tarLongFileMode>posix</tarLongFileMode> - <finalName>load-client-${project.version}</finalName> - <descriptors> - <descriptor>src/main/assembly/load-client-bin-assembly.xml</descriptor> - </descriptors> - <attach>false</attach> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> diff --git a/dev-tools/load-client/requirements.txt b/dev-tools/load-client/requirements.txt new file mode 100644 index 0000000000..e071117edf --- /dev/null +++ b/dev-tools/load-client/requirements.txt @@ -0,0 +1,3 @@ +pydantic>=2.5 +rich>=13.0 +pyyaml>=6.0 diff --git a/dev-tools/load-client/src/main/assembly/load-client-bin-assembly.xml b/dev-tools/load-client/src/main/assembly/load-client-bin-assembly.xml deleted file mode 100644 index 646f5d9005..0000000000 --- a/dev-tools/load-client/src/main/assembly/load-client-bin-assembly.xml +++ /dev/null @@ -1,66 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.2.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.2.0 http://maven.apache.org/xsd/assembly-2.2.0.xsd"> - <id>bin</id> - <includeBaseDirectory>true</includeBaseDirectory> - <baseDirectory>load-client</baseDirectory> - <formats> - <format>tar.gz</format> - </formats> - - <fileSets> - <fileSet> - <directory>src/main/resources/bin</directory> - <outputDirectory>bin</outputDirectory> - <fileMode>777</fileMode> - <includes> - <include>*.sh</include> - </includes> - </fileSet> - <fileSet> - <directory>../../keystores</directory> - <outputDirectory>bin</outputDirectory> - <fileMode>777</fileMode> - <includes> - <include>airavata.p12</include> - </includes> - </fileSet> - <fileSet> - <directory>src/main/resources/conf</directory> - <outputDirectory>conf</outputDirectory> - <includes> - <include>load-config.yml</include> - </includes> - </fileSet> - </fileSets> - - <dependencySets> - <dependencySet> - <useProjectArtifact>true</useProjectArtifact> - <outputDirectory>lib</outputDirectory> - <includes> - <include>*:*:jar</include> - </includes> - </dependencySet> - </dependencySets> -</assembly> \ No newline at end of file diff --git a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/Authenticator.java b/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/Authenticator.java deleted file mode 100644 index bd4310cb1a..0000000000 --- a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/Authenticator.java +++ /dev/null @@ -1,69 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.tools.load; - -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.util.HashMap; -import java.util.Map; -import org.apache.airavata.model.security.AuthzToken; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.conn.ssl.TrustSelfSignedStrategy; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.ssl.SSLContextBuilder; -import org.keycloak.authorization.client.AuthzClient; -import org.keycloak.authorization.client.Configuration; -import org.keycloak.representations.AccessTokenResponse; - -public class Authenticator { - - public static AuthzToken getAuthzToken( - String userName, - String password, - String gateway, - String keycloakUrl, - String keycloakClientId, - String keycloakClientSecret) - throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException { - - Map<String, Object> clientCredentials = new HashMap<>(); - clientCredentials.put("secret", keycloakClientSecret); - SSLContextBuilder builder = new SSLContextBuilder(); - builder.loadTrustMaterial(null, new TrustSelfSignedStrategy()); - SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(builder.build()); - CloseableHttpClient httpclient = - HttpClients.custom().setSSLSocketFactory(sslsf).build(); - - Configuration configuration = - new Configuration(keycloakUrl, gateway, keycloakClientId, clientCredentials, httpclient); - AuthzClient keycloakClient = AuthzClient.create(configuration); - AccessTokenResponse accessToken = keycloakClient.obtainAccessToken(userName, password); - - AuthzToken authzToken = new AuthzToken(); - Map<String, String> claims = new HashMap<>(); - claims.put("gatewayID", gateway); - claims.put("userName", userName); - authzToken.setAccessToken(accessToken.getToken()); - authzToken.setClaimsMap(claims); - return authzToken; - } -} diff --git a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/Configuration.java b/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/Configuration.java deleted file mode 100644 index d3c9ab2788..0000000000 --- a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/Configuration.java +++ /dev/null @@ -1,250 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.tools.load; - -import java.util.ArrayList; -import java.util.List; -import org.apache.airavata.model.security.AuthzToken; - -public class Configuration { - - private String userId; - - private String gatewayId; - private String projectId; - private String applicationInterfaceId; - private String computeResourceId; - private String storageResourceId; - private String keycloakUrl; - private String keycloakClientId; - private String keycloakClientSecret; - - private String experimentBaseName; - - private String queue; - private int wallTime; - private int cpuCount; - private int nodeCount; - private int physicalMemory; - - private int concurrentUsers; - private int iterationsPerUser; - private int randomMSDelayWithinSubmissions; - - private AuthzToken authzToken; - - public AuthzToken getAuthzToken() throws Exception { - - if (authzToken == null) { - System.out.print("Enter password for user " + getUserId() + " in gateway " + getGatewayId() + " : "); - String pw = new String(System.console().readPassword()); - authzToken = Authenticator.getAuthzToken( - getUserId(), - pw, - getGatewayId(), - getKeycloakUrl(), - getKeycloakClientId(), - getKeycloakClientSecret()); - } - return authzToken; - } - - private List<Input> inputs = new ArrayList<>(); - - public String getUserId() { - return userId; - } - - public void setUserId(String userId) { - this.userId = userId; - } - - public String getGatewayId() { - return gatewayId; - } - - public void setGatewayId(String gatewayId) { - this.gatewayId = gatewayId; - } - - public String getProjectId() { - return projectId; - } - - public void setProjectId(String projectId) { - this.projectId = projectId; - } - - public String getApplicationInterfaceId() { - return applicationInterfaceId; - } - - public void setApplicationInterfaceId(String applicationInterfaceId) { - this.applicationInterfaceId = applicationInterfaceId; - } - - public String getComputeResourceId() { - return computeResourceId; - } - - public void setComputeResourceId(String computeResourceId) { - this.computeResourceId = computeResourceId; - } - - public String getStorageResourceId() { - return storageResourceId; - } - - public void setStorageResourceId(String storageResourceId) { - this.storageResourceId = storageResourceId; - } - - public String getExperimentBaseName() { - return experimentBaseName; - } - - public void setExperimentBaseName(String experimentBaseName) { - this.experimentBaseName = experimentBaseName; - } - - public String getQueue() { - return queue; - } - - public void setQueue(String queue) { - this.queue = queue; - } - - public int getWallTime() { - return wallTime; - } - - public void setWallTime(int wallTime) { - this.wallTime = wallTime; - } - - public int getCpuCount() { - return cpuCount; - } - - public void setCpuCount(int cpuCount) { - this.cpuCount = cpuCount; - } - - public int getNodeCount() { - return nodeCount; - } - - public void setNodeCount(int nodeCount) { - this.nodeCount = nodeCount; - } - - public int getPhysicalMemory() { - return physicalMemory; - } - - public void setPhysicalMemory(int physicalMemory) { - this.physicalMemory = physicalMemory; - } - - public int getConcurrentUsers() { - return concurrentUsers; - } - - public void setConcurrentUsers(int concurrentUsers) { - this.concurrentUsers = concurrentUsers; - } - - public int getIterationsPerUser() { - return iterationsPerUser; - } - - public void setIterationsPerUser(int iterationsPerUser) { - this.iterationsPerUser = iterationsPerUser; - } - - public int getRandomMSDelayWithinSubmissions() { - return randomMSDelayWithinSubmissions; - } - - public void setRandomMSDelayWithinSubmissions(int randomMSDelayWithinSubmissions) { - this.randomMSDelayWithinSubmissions = randomMSDelayWithinSubmissions; - } - - public List<Input> getInputs() { - return inputs; - } - - public void setInputs(List<Input> inputs) { - this.inputs = inputs; - } - - public static class Input { - private String name; - private String value; - - public Input() {} - - public Input(String name, String value) { - this.name = name; - this.value = value; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getValue() { - return value; - } - - public void setValue(String value) { - this.value = value; - } - } - - public String getKeycloakUrl() { - return keycloakUrl; - } - - public void setKeycloakUrl(String keycloakUrl) { - this.keycloakUrl = keycloakUrl; - } - - public String getKeycloakClientId() { - return keycloakClientId; - } - - public void setKeycloakClientId(String keycloakClientId) { - this.keycloakClientId = keycloakClientId; - } - - public String getKeycloakClientSecret() { - return keycloakClientSecret; - } - - public void setKeycloakClientSecret(String keycloakClientSecret) { - this.keycloakClientSecret = keycloakClientSecret; - } -} diff --git a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/Configurations.java b/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/Configurations.java deleted file mode 100644 index 05cb523688..0000000000 --- a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/Configurations.java +++ /dev/null @@ -1,52 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.tools.load; - -import java.util.List; - -public class Configurations { - private String apiHost; - private int apiPort; - private List<Configuration> configurations; - - public List<Configuration> getConfigurations() { - return configurations; - } - - public void setConfigurations(List<Configuration> configurations) { - this.configurations = configurations; - } - - public String getApiHost() { - return apiHost; - } - - public void setApiHost(String apiHost) { - this.apiHost = apiHost; - } - - public int getApiPort() { - return apiPort; - } - - public void setApiPort(int apiPort) { - this.apiPort = apiPort; - } -} diff --git a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/LoadClient.java b/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/LoadClient.java deleted file mode 100644 index cfa5dffa48..0000000000 --- a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/LoadClient.java +++ /dev/null @@ -1,180 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.tools.load; - -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletionService; -import java.util.concurrent.Future; -import org.apache.airavata.api.Airavata; -import org.apache.airavata.common.util.AiravataClientFactory; -import org.apache.airavata.common.config.ServerSettings; -import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; -import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.Options; -import org.yaml.snakeyaml.Yaml; - -public class LoadClient { - - private String privateKeyFile = System.getProperty("user.home") + "/.ssh/id_rsa"; - private String publicKeyFile = System.getProperty("user.home") + "/.ssh/id_rsa.pub"; - private String passPhrase = null; - private String configFile; - - private final Map<String, StorageResourceManager> storageResourceManagerStore = new HashMap<>(); - private Configurations configurations; - - public void init() throws Exception { - - if (configFile == null) { - try (InputStream in = LoadClient.class.getResourceAsStream("/conf/load-config.yml")) { - Yaml yaml = new Yaml(); - configurations = yaml.loadAs(in, Configurations.class); - } - } else { - try (InputStream in = new FileInputStream(configFile)) { - Yaml yaml = new Yaml(); - configurations = yaml.loadAs(in, Configurations.class); - } - } - - // Making sure that all authzkeys are loaded - for (Configuration cfg : configurations.getConfigurations()) { - cfg.getAuthzToken(); - } - createStorageResourceManagers(configurations); - } - - public void start() throws Exception { - for (Configuration configuration : configurations.getConfigurations()) { - - UnitLoad unitLoad = new UnitLoad( - configurations.getApiHost(), - configurations.getApiPort(), - storageResourceManagerStore.get(configuration.getStorageResourceId()), - configuration.getAuthzToken()); - - StatusMonitor statusMonitor = new StatusMonitor( - configurations.getApiHost(), - configurations.getApiPort(), - configuration.getAuthzToken()); - - CompletionService<List<String>> completion = unitLoad.execute(configuration); - - List<String> allExperiments = new ArrayList<>(); - - for (int i = 0; i < configuration.getConcurrentUsers(); i++) { - Future<List<String>> experimentsPerUser = completion.take(); - allExperiments.addAll(experimentsPerUser.get()); - } - System.out.println("All experiments "); - System.out.println(allExperiments); - statusMonitor.monitorExperiments(allExperiments); - } - destroyStorageResourceManagers(); - System.out.println("Finished load "); - System.exit(0); - } - - private void createStorageResourceManagers(Configurations configurations) throws Exception { - - Airavata.Client airavataClient = AiravataClientFactory.createAiravataClient( - configurations.getApiHost(), - configurations.getApiPort(), - ServerSettings.isTLSEnabled()); - - for (Configuration configuration : configurations.getConfigurations()) { - String storageResourceId = configuration.getStorageResourceId(); - - if (!storageResourceManagerStore.containsKey(storageResourceId)) { - StorageResourceDescription storageResource = - airavataClient.getStorageResource(configuration.getAuthzToken(), storageResourceId); - - StoragePreference gatewayStoragePreference = airavataClient.getGatewayStoragePreference( - configuration.getAuthzToken(), configuration.getGatewayId(), storageResourceId); - - StorageResourceManager storageResourceManager = new StorageResourceManager( - gatewayStoragePreference, storageResource, privateKeyFile, publicKeyFile, passPhrase); - storageResourceManager.init(); - - storageResourceManagerStore.put(storageResourceId, storageResourceManager); - } - } - } - - private void destroyStorageResourceManagers() { - storageResourceManagerStore.values().forEach(StorageResourceManager::destroy); - } - - public static void main(String[] args) throws Exception { - - Options options = new Options(); - options.addOption("config", true, "Load configuration file in yaml format"); - options.addOption("apiHost", true, "API Server host name"); - options.addOption("apiPort", true, "API Server port"); - options.addOption( - "privateKeyPath", - true, - "SSH private key path to communicate with storage resources (Defaults to user private key in ~/.ssh/id_rsa)"); - options.addOption( - "publicKeyPath", - true, - "SSH public key path to communicate with storage resources (Defaults to user public key in ~/.ssh/id_rsa.pub)"); - options.addOption("passPhrase", true, "SSH private key pass phrase (if any)"); - - CommandLineParser parser = new GnuParser(); - CommandLine cmd = parser.parse(options, args); - - LoadClient loadClient = new LoadClient(); - - if (cmd.hasOption("config")) { - loadClient.configFile = cmd.getOptionValue("config"); - } else { - System.out.println("Error : Load config file should be specified"); - System.exit(0); - } - - if (cmd.hasOption("privateKeyPath")) { - loadClient.privateKeyFile = cmd.getOptionValue("privateKeyPath"); - } else { - System.out.println("Using default private key file " + loadClient.privateKeyFile); - } - - if (cmd.hasOption("publicKeyPath")) { - loadClient.publicKeyFile = cmd.getOptionValue("publicKeyPath"); - } else { - System.out.println("Using default public key file " + loadClient.publicKeyFile); - } - - if (cmd.hasOption("passPhrase")) { - loadClient.passPhrase = cmd.getOptionValue("passPhrase"); - } - - loadClient.init(); - loadClient.start(); - } -} diff --git a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/StatusMonitor.java b/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/StatusMonitor.java deleted file mode 100644 index eb09bf48a6..0000000000 --- a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/StatusMonitor.java +++ /dev/null @@ -1,149 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.tools.load; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.airavata.api.Airavata; -import org.apache.airavata.common.util.AiravataClientFactory; -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.config.ServerSettings; -import org.apache.airavata.model.error.AiravataClientException; -import org.apache.airavata.model.experiment.ExperimentModel; -import org.apache.airavata.model.job.JobModel; -import org.apache.airavata.model.security.AuthzToken; -import org.apache.airavata.model.status.ExperimentState; -import org.apache.airavata.model.status.JobState; -import org.apache.thrift.TException; - -public class StatusMonitor { - - private String apiHost; - private int apiPort; - private AuthzToken authzToken; - - public StatusMonitor(String apiHost, int apiPort, AuthzToken authzToken) { - this.apiHost = apiHost; - this.apiPort = apiPort; - this.authzToken = authzToken; - } - - public void monitorExperiments(List<String> experiments) throws TException, ApplicationSettingsException { - - Map<String, JobModel> jobModelMap = new HashMap<>(); - Map<String, ExperimentModel> experimentModelMap = new HashMap<>(); - - Airavata.Client airavataClient; - long monitoringStartTime = System.currentTimeMillis(); - while (experiments.size() > jobModelMap.size()) { - System.out.println("Running a monitoring round...."); - airavataClient = AiravataClientFactory.createAiravataClient(apiHost, apiPort, ServerSettings.isTLSEnabled()); - - for (String experiment : experiments) { - - try { - if (jobModelMap.containsKey(experiment)) { - continue; - } - List<JobModel> jobDetails = airavataClient.getJobDetails(authzToken, experiment); - if (jobDetails.size() > 0) { - jobModelMap.put(experiment, jobDetails.get(0)); - } - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Error while monitoring experiment " + experiment); - } - } - - System.out.println("Jobs " + jobModelMap.size() + "/" + experiments.size() + " submitted"); - try { - Thread.sleep(20 * 1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - airavataClient = AiravataClientFactory.createAiravataClient(apiHost, apiPort, ServerSettings.isTLSEnabled()); - - for (String experiment : experiments) { - experimentModelMap.put(experiment, airavataClient.getExperiment(authzToken, experiment)); - } - - long totalTime = 0; - long totalExperiments = 0; - - System.out.println("EXP ID,CREATE_TIME,LAUNCHED_TIME,EXECUTING_TIME,JOB_SUBMIT_TIME"); - List<String> lines = new ArrayList<>(); - for (String experiment : experiments) { - try { - - long expCreatedTime = experimentModelMap.get(experiment).getExperimentStatus().stream() - .filter(es -> es.getState() == ExperimentState.CREATED) - .findFirst() - .get() - .getTimeOfStateChange(); - - long expLaunchedTime = experimentModelMap.get(experiment).getExperimentStatus().stream() - .filter(es -> es.getState() == ExperimentState.LAUNCHED) - .findFirst() - .get() - .getTimeOfStateChange(); - - long expExecutedTime = experimentModelMap.get(experiment).getExperimentStatus().stream() - .filter(es -> es.getState() == ExperimentState.EXECUTING) - .findFirst() - .get() - .getTimeOfStateChange(); - - long jobSubmittedTime = jobModelMap.get(experiment).getJobStatuses().stream() - .filter(st -> st.getJobState() == JobState.SUBMITTED) - .findFirst() - .get() - .getTimeOfStateChange(); - - // long jobCompletedTime = jobModelMap.get(experiment) - // .getJobStatuses().stream().filter(st -> st.getJobState() == JobState.COMPLETE).findFirst() - // .get().getTimeOfStateChange(); - - // long expCompletedTime = experimentModelMap.get(experiment) - // .getExperimentStatus().stream().filter(es -> es.getState() == - // ExperimentState.COMPLETED).findFirst() - // .get().getTimeOfStateChange(); - lines.add(experiment + "," + expCreatedTime + "," + expLaunchedTime + "," + expExecutedTime + "," - + jobSubmittedTime); - totalTime += jobSubmittedTime - expExecutedTime; - totalExperiments++; - } catch (Exception e) { - System.out.println("Error parsing " + experiment + ". Err " + e.getMessage()); - e.printStackTrace(); - } - } - long monitoringStopTime = System.currentTimeMillis(); - - for (String line : lines) { - System.out.println(line); - } - System.out.println("All jobs completed"); - System.out.println("Average time " + (totalTime * 1.0 / totalExperiments) / 1000 + " s"); - System.out.println("Time for monitoring " + (monitoringStopTime - monitoringStartTime) / 1000 + "s"); - } -} diff --git a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/StorageResourceManager.java b/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/StorageResourceManager.java deleted file mode 100644 index 12b349d12f..0000000000 --- a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/StorageResourceManager.java +++ /dev/null @@ -1,128 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.tools.load; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Collections; -import org.apache.airavata.compute.util.AgentException; -import org.apache.airavata.api.Airavata; -import org.apache.airavata.storage.util.SSHJStorageAdaptor; -import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; -import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; -import org.apache.airavata.model.data.replica.*; -import org.apache.airavata.model.security.AuthzToken; -import org.apache.thrift.TException; - -public class StorageResourceManager { - - private StoragePreference gatewayStoragePreference; - private StorageResourceDescription storageResource; - private String storageResourceId; - - private String privateKeyFile; - private String publicKeyFile; - private String passPhrase; - - private SSHJStorageAdaptor storageAdaptor = new SSHJStorageAdaptor(); - - public StorageResourceManager( - StoragePreference gatewayStoragePreference, - StorageResourceDescription storageResource, - String privateKeyFile, - String publicKeyFile, - String passPhrase) { - this.storageResourceId = storageResource.getStorageResourceId(); - this.storageResource = storageResource; - this.gatewayStoragePreference = gatewayStoragePreference; - this.privateKeyFile = privateKeyFile; - this.publicKeyFile = publicKeyFile; - this.passPhrase = passPhrase; - } - - public void init() throws IOException, AgentException { - storageAdaptor.init( - gatewayStoragePreference.getLoginUserName(), - storageResource.getHostName(), - 22, - readFile(publicKeyFile, Charset.defaultCharset()), - readFile(privateKeyFile, Charset.defaultCharset()), - passPhrase); - } - - public void destroy() { - storageAdaptor.destroy(); - } - - public String uploadInputFile( - Airavata.Client airavataClient, - String filePath, - String user, - String project, - String experiment, - String gatewayId) - throws TException, AgentException { - - String experimentDirectory = getExperimentDirectory(user, project, experiment); - - String uploadFilePath = experimentDirectory.concat(File.separator).concat(new File(filePath).getName()); - storageAdaptor.uploadFile(filePath, uploadFilePath); - - DataProductModel dataProductModel = new DataProductModel(); - dataProductModel.setGatewayId(gatewayId); - dataProductModel.setOwnerName(user); - dataProductModel.setDataProductType(DataProductType.FILE); - - DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel(); - replicaLocationModel.setStorageResourceId(storageResourceId); - replicaLocationModel.setReplicaName((new File(filePath).getName()) + " gateway data store copy"); - replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE); - replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT); - replicaLocationModel.setFilePath("file://" + storageResource.getHostName() + ":" + uploadFilePath); - - dataProductModel.setReplicaLocations(Collections.singletonList(replicaLocationModel)); - System.out.println("Registring " + uploadFilePath); - return airavataClient.registerDataProduct(new AuthzToken(""), dataProductModel); - } - - public void createExperimentDirectory(String user, String project, String experiment) throws AgentException { - String experimentDirectory = getExperimentDirectory(user, project, experiment); - storageAdaptor.createDirectory(experimentDirectory, true); - } - - private String getExperimentDirectory(String user, String project, String experiment) { - return gatewayStoragePreference - .getFileSystemRootLocation() - .concat(File.separator) - .concat(user) - .concat(File.separator) - .concat(project) - .concat(File.separator) - .concat(experiment); - } - - static String readFile(String path, Charset encoding) throws IOException { - byte[] encoded = Files.readAllBytes(Paths.get(path)); - return new String(encoded, encoding); - } -} diff --git a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/UnitLoad.java b/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/UnitLoad.java deleted file mode 100644 index 9bd06ab7e8..0000000000 --- a/dev-tools/load-client/src/main/java/org/apache/airavata/tools/load/UnitLoad.java +++ /dev/null @@ -1,186 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.tools.load; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.*; -import org.apache.airavata.compute.util.AgentException; -import org.apache.airavata.api.Airavata; -import org.apache.airavata.common.util.AiravataClientFactory; -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.config.ServerSettings; -import org.apache.airavata.model.application.io.DataType; -import org.apache.airavata.model.application.io.InputDataObjectType; -import org.apache.airavata.model.experiment.ExperimentModel; -import org.apache.airavata.model.experiment.ExperimentType; -import org.apache.airavata.model.experiment.UserConfigurationDataModel; -import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; -import org.apache.airavata.model.security.AuthzToken; -import org.apache.thrift.TException; - -public class UnitLoad { - - private String apiHost; - private int apiPort; - private StorageResourceManager storageResourceManager; - private AuthzToken authzToken; - - public UnitLoad( - String apiHost, - int apiPort, - StorageResourceManager storageResourceManager, - AuthzToken authzToken) { - this.apiHost = apiHost; - this.apiPort = apiPort; - this.storageResourceManager = storageResourceManager; - this.authzToken = authzToken; - } - - public CompletionService<List<String>> execute(Configuration config) { - String randomUUID = UUID.randomUUID().toString(); - ExecutorService executorService = Executors.newFixedThreadPool(config.getConcurrentUsers()); - CompletionService<List<String>> completionService = new ExecutorCompletionService<>(executorService); - - for (int i = 0; i < config.getConcurrentUsers(); i++) { - completionService.submit(new Worker( - config, - randomUUID + "-" + i, - config.getIterationsPerUser(), - config.getRandomMSDelayWithinSubmissions())); - } - return completionService; - } - - public class Worker implements Callable<List<String>> { - - private final String id; - private final int iterations; - private final int delay; - private final Configuration config; - - public Worker(Configuration config, String id, int iterations, int delay) { - this.id = id; - this.iterations = iterations; - this.delay = delay; - this.config = config; - } - - @Override - public List<String> call() { - List<String> experiments = new ArrayList<>(); - for (int i = 0; i < iterations; i++) { - try { - double randomDouble = Math.random(); - randomDouble = randomDouble * delay + 1; - long randomLong = (long) randomDouble; - Thread.sleep(randomLong); - experiments.add(submitExperiment(config, id + "-" + i)); - } catch (TException | ApplicationSettingsException | AgentException | InterruptedException e) { - e.printStackTrace(); - } - } - return experiments; - } - } - - private String submitExperiment(Configuration config, String suffix) throws TException, AgentException, ApplicationSettingsException { - - String experimentName = config.getExperimentBaseName() + suffix; - - ExperimentModel experimentModel = new ExperimentModel(); - experimentModel.setExperimentName(experimentName); - experimentModel.setProjectId(config.getProjectId()); - experimentModel.setUserName(config.getUserId()); - experimentModel.setGatewayId(config.getGatewayId()); - experimentModel.setExecutionId(config.getApplicationInterfaceId()); - - ComputationalResourceSchedulingModel computationalResourceSchedulingModel = - new ComputationalResourceSchedulingModel(); - computationalResourceSchedulingModel.setQueueName(config.getQueue()); - computationalResourceSchedulingModel.setNodeCount(config.getNodeCount()); - computationalResourceSchedulingModel.setTotalCPUCount(config.getCpuCount()); - computationalResourceSchedulingModel.setWallTimeLimit(config.getWallTime()); - computationalResourceSchedulingModel.setTotalPhysicalMemory(config.getPhysicalMemory()); - computationalResourceSchedulingModel.setResourceHostId(config.getComputeResourceId()); - - UserConfigurationDataModel userConfigurationDataModel = new UserConfigurationDataModel(); - userConfigurationDataModel.setComputationalResourceScheduling(computationalResourceSchedulingModel); - userConfigurationDataModel.setAiravataAutoSchedule(false); - userConfigurationDataModel.setOverrideManualScheduledParams(false); - userConfigurationDataModel.setStorageId(config.getStorageResourceId()); - userConfigurationDataModel.setExperimentDataDir(config.getUserId() - .concat(File.separator) - .concat(config.getProjectId()) - .concat(File.separator) - .concat(experimentName)); - - experimentModel.setUserConfigurationData(userConfigurationDataModel); - - Airavata.Client airavataClient = AiravataClientFactory.createAiravataClient(apiHost, apiPort, ServerSettings.isTLSEnabled()); - - List<InputDataObjectType> applicationInputs = - airavataClient.getApplicationInputs(authzToken, config.getApplicationInterfaceId()); - List<InputDataObjectType> experimentInputs = new ArrayList<>(); - - storageResourceManager.createExperimentDirectory(config.getUserId(), config.getProjectId(), experimentName); - - for (InputDataObjectType inputDataObjectType : applicationInputs) { - - Optional<Configuration.Input> input = config.getInputs().stream() - .filter(inp -> inp.getName().equals(inputDataObjectType.getName())) - .findFirst(); - - if (input.isPresent()) { - if (inputDataObjectType.getType() == DataType.URI) { - String localFilePath = input.get().getValue(); - String uploadedPath = storageResourceManager.uploadInputFile( - airavataClient, - localFilePath, - config.getUserId(), - config.getProjectId(), - experimentName, - config.getGatewayId()); - inputDataObjectType.setValue(uploadedPath); - - } else if (inputDataObjectType.getType() == DataType.STRING) { - inputDataObjectType.setValue(input.get().getValue()); - } - } - experimentInputs.add(inputDataObjectType); - } - - experimentModel.setExperimentInputs(experimentInputs); - experimentModel.setExperimentOutputs( - airavataClient.getApplicationOutputs(authzToken, config.getApplicationInterfaceId())); - experimentModel.setExperimentType(ExperimentType.SINGLE_APPLICATION); - - String experimentId = airavataClient.createExperiment(authzToken, config.getGatewayId(), experimentModel); - - airavataClient.launchExperiment(authzToken, experimentId, config.getGatewayId()); - System.out.println(experimentId); - - ExperimentModel experiment = airavataClient.getExperiment(authzToken, experimentId); - return experimentId; - } -} diff --git a/dev-tools/load-client/src/main/resources/bin/load-client.sh b/dev-tools/load-client/src/main/resources/bin/load-client.sh deleted file mode 100644 index e10efac126..0000000000 --- a/dev-tools/load-client/src/main/resources/bin/load-client.sh +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env bash - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Source the common environment and functions -. $(dirname $0)/setenv.sh - -# Client-specific configuration -MAIN_CLASS="org.apache.airavata.tools.load.LoadClient" -JAVA_OPTS="-Dairavata.config.dir=${AIRAVATA_HOME}/conf -Dairavata.home=${AIRAVATA_HOME}" - -# Parse client-specific arguments -while (($# > 0)); do - case "$1" in - -xdebug) - JAVA_OPTS+=" -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,address=8000" - shift - ;; - -h) - echo "Usage: load-client.sh" - echo "" - echo "command options:" - echo " -config Load configuration file in yml format" - echo " -apiHost API Server host name" - echo " -apiPort API Server port" - echo " -privateKeyPath SSH private key path to communicate with storage resources (Defaults to user private key in ~/.ssh/id_rsa)" - echo " -publicKeyPath SSH public key path to communicate with storage resources (Defaults to user public key in ~/.ssh/id_rsa.pub)" - echo " -passPhrase SSH private key pass phrase (if any)" - echo " -xdebug Start under JPDA debugger" - echo " -h Display this help and exit" - exit 0 - ;; - *) - # Pass all other arguments to the Java application - break - ;; - esac -done - -# Run the load client -java ${JAVA_OPTS} -classpath "${CLASSPATH}" ${MAIN_CLASS} "$@" diff --git a/dev-tools/load-client/src/main/resources/bin/setenv.sh b/dev-tools/load-client/src/main/resources/bin/setenv.sh deleted file mode 100644 index 9024de1d7b..0000000000 --- a/dev-tools/load-client/src/main/resources/bin/setenv.sh +++ /dev/null @@ -1,146 +0,0 @@ -#!/bin/bash - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Resolve symlinks to get the real script location -PRG="$0" -while [ -L "$PRG" ]; do - PRG=$(readlink "$PRG") -done -PRGDIR=$(dirname "$PRG") - -# Set AIRAVATA_HOME if not already set -[ -z "$AIRAVATA_HOME" ] && AIRAVATA_HOME=$(cd "$PRGDIR/.." && pwd) - -# Build CLASSPATH from all JAR files -CLASSPATH=$(printf "%s:" "$AIRAVATA_HOME"/lib/*.jar) -CLASSPATH=${CLASSPATH%:} # Remove trailing colon - -export AIRAVATA_HOME CLASSPATH - -# Common function to run Airavata services -# Usage: run_service <service_name> <main_class> <java_opts> -run_service() { - local SERVICE_NAME="$1" MAIN_CLASS="$2" JAVA_OPTS="$3" - # Export SERVICE_NAME as environment variable for log4j2 configuration - export SERVICE_NAME - local CWD="$PWD" PID_PATH_NAME="${AIRAVATA_HOME}/bin/pid-${SERVICE_NAME}" - local DEFAULT_LOG_FILE="${AIRAVATA_HOME}/logs/${SERVICE_NAME}.log" - local LOG_FILE="$DEFAULT_LOG_FILE" DAEMON_MODE=false EXTRA_ARGS="" - - # Help text - local HELP_TEXT="Usage: ${SERVICE_NAME}.sh - -command options: - -d Run in daemon mode - -xdebug Start ${SERVICE_NAME} under JPDA debugger - -log <LOG_FILE> Where to redirect stdout/stderr (defaults to $DEFAULT_LOG_FILE) - -h Display this help and exit - -Daemon mode commands (use with -d): - start Start server in daemon mode - stop Stop server running in daemon mode - restart Restart server in daemon mode" - - cd "${AIRAVATA_HOME}/bin" - - # Helper function to stop daemon process - stop_daemon() { - if [[ -f "$PID_PATH_NAME" ]]; then - local PID=$(cat "$PID_PATH_NAME") - echo "$SERVICE_NAME stopping..." - pkill -P "$PID" - kill "$PID" - - local retry=0 - while kill -0 "$PID" 2>/dev/null && ((retry++ < 20)); do - echo "[PID: $PID] Waiting for process to stop..." - sleep 1 - done - - if kill -0 "$PID" 2>/dev/null; then - echo "[PID: $PID] Forcefully killing non-responsive process..." - pkill -9 -P "$PID" - kill -9 "$PID" - fi - - echo "$SERVICE_NAME is now stopped." - rm "$PID_PATH_NAME" - return 0 - else - echo "$SERVICE_NAME is not running." - return 1 - fi - } - - # Helper function to start daemon process - start_daemon() { - echo "Starting $SERVICE_NAME ..." - if [[ ! -f "$PID_PATH_NAME" ]]; then - nohup java $JAVA_OPTS -classpath "$CLASSPATH" "$MAIN_CLASS" "$@" >"$LOG_FILE" 2>&1 & - echo $! >"$PID_PATH_NAME" - echo "$SERVICE_NAME now running: PID $(cat "$PID_PATH_NAME")" - else - echo "$SERVICE_NAME already running: PID $(cat "$PID_PATH_NAME")" - fi - } - - # Parse command arguments - while (($# > 0)); do - case "$1" in - -d) DAEMON_MODE=true ;; - -xdebug) JAVA_OPTS+=" -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,address=*:8000" ;; - -log) - shift - LOG_FILE="$1" - [[ "$LOG_FILE" != /* ]] && LOG_FILE="${CWD}/${LOG_FILE}" - ;; - start | stop | restart) - if [[ "$DAEMON_MODE" == true ]]; then - case "$1" in - start) start_daemon "$@" ;; - stop) stop_daemon ;; - restart) - stop_daemon - start_daemon "$@" - ;; - esac - exit 0 - else - EXTRA_ARGS+=" $1" - fi - ;; - -h) - echo "$HELP_TEXT" - exit 0 - ;; - *) EXTRA_ARGS+=" $1" ;; - esac - shift - done - - # Validate daemon mode usage - if [[ "$DAEMON_MODE" == true ]]; then - echo "Error: Daemon mode (-d) requires one of: start, stop, restart" - echo "Use -h for help" - exit 1 - fi - - # Run in foreground mode - java $JAVA_OPTS -classpath "$CLASSPATH" "$MAIN_CLASS" $EXTRA_ARGS -} diff --git a/dev-tools/load-client/src/main/resources/conf/load-config.yml b/dev-tools/load-client/src/main/resources/conf/load-config.yml deleted file mode 100644 index 6b403947c9..0000000000 --- a/dev-tools/load-client/src/main/resources/conf/load-config.yml +++ /dev/null @@ -1,27 +0,0 @@ -apiHost: apidev.scigap.org -apiPort: 9930 -configurations: - - experimentBaseName: "TestEcho" - userId: "dimuthu" - gatewayId: "seagrid" - projectId: "DefaultProject_7ac38275-0ca1-433a-ab6a-630c8c1df2ef" - applicationInterfaceId: "Echo_3f480d1f-ea86-4018-94bb-015423d66a1c" - computeResourceId: "bigred2.uits.iu.edu_ac140dca-3c88-46d8-b9ed-875d96ea6908" - storageResourceId: "pgadev.scigap.org_7ddf28fd-d503-4ff8-bbc5-3279a7c3b99e" - keycloakUrl: "https://iamdev.scigap.org/auth" - keycloakClientId: "pga" - keycloakClientSecret: "secret" - - inputs: - - name: "Input-to-Echo" - value: "Test" - - queue: "cpu" - wallTime: 60 - cpuCount: 2 - nodeCount: 1 - physicalMemory: 512 - - concurrentUsers: 1 - iterationsPerUser: 2 - randomMSDelayWithinSubmissions: 100
