This is an automated email from the ASF dual-hosted git repository.

yasithdev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cadb49f1f feat(python-sdk): add framework-agnostic 
experiment-orchestration + queue-settings helpers (#657)
7cadb49f1f is described below

commit 7cadb49f1f4636583de90a93e97c5480c8e2f552
Author: Yasith Jayawardana <[email protected]>
AuthorDate: Tue Jun 9 12:12:41 2026 -0400

    feat(python-sdk): add framework-agnostic experiment-orchestration + 
queue-settings helpers (#657)
    
    Relocate API-side experiment behavior out of the Django portal SDK
    (airavata_django_portal_sdk) into the new gRPC SDK as a framework-agnostic
    airavata_sdk.helpers package. No Django/DRF/request/Thrift dependencies; 
every
    function takes an AiravataClient plus explicit kwargs and routes backend 
access
    through the gRPC facades (client.research / client.storage / 
client.sharing).
    
    - helpers/experiment_orchestration.py: launch, clone, is_input_file, and the
      intermediate-output helpers (can_fetch_intermediate_output,
      get_intermediate_output_process_status, fetch_intermediate_output,
      get_intermediate_output_data_products). Repointed from
      request.airavata_client.* + the Django user_storage module onto the gRPC
      facades and proto snake_case field names. tmp input-file uploads are now 
moved
      in place (move_file + update_replica_location) preserving the data-product
      URI; clone-copy is implemented as download -> upload -> 
register_data_product
      since the storage facade has no copy primitive.
    - helpers/queue_settings.py: the queue-settings calculator registry +
      queue_settings_calculator decorator, relocated verbatim in behavior;
      calculate_queue_settings now forwards *args/**kwargs so existing callers 
keep
      working unchanged.
    
    setuptools auto-discovers airavata_sdk.helpers via the existing 
airavata_sdk*
    glob; no version bump.
---
 .../airavata_sdk/helpers/__init__.py               |   8 +
 .../helpers/experiment_orchestration.py            | 502 +++++++++++++++++++++
 .../airavata_sdk/helpers/queue_settings.py         |  76 ++++
 3 files changed, 586 insertions(+)

diff --git a/airavata-python-sdk/airavata_sdk/helpers/__init__.py 
b/airavata-python-sdk/airavata_sdk/helpers/__init__.py
new file mode 100644
index 0000000000..82f634a3a0
--- /dev/null
+++ b/airavata-python-sdk/airavata_sdk/helpers/__init__.py
@@ -0,0 +1,8 @@
+"""Framework-agnostic helpers relocated from the Django portal SDK.
+
+These modules carry behavior that used to live in
+``airavata_django_portal_sdk`` but contain no Django/DRF/Thrift dependencies.
+All backend access is routed through the gRPC facades exposed by
+:class:`airavata_sdk.client.AiravataClient` (``client.research``,
+``client.storage``, ``client.sharing``).
+"""
diff --git 
a/airavata-python-sdk/airavata_sdk/helpers/experiment_orchestration.py 
b/airavata-python-sdk/airavata_sdk/helpers/experiment_orchestration.py
new file mode 100644
index 0000000000..1e901c9750
--- /dev/null
+++ b/airavata-python-sdk/airavata_sdk/helpers/experiment_orchestration.py
@@ -0,0 +1,502 @@
+"""Framework-agnostic experiment orchestration helpers.
+
+Relocated (in behavior) from
+``airavata_django_portal_sdk.experiment_util`` (``api.py`` and
+``intermediate_output.py``) and repointed from Thrift
+(``request.airavata_client.*``) plus the Django ``user_storage`` module onto 
the
+gRPC facades exposed by :class:`airavata_sdk.client.AiravataClient`
+(``client.research``, ``client.storage``, ``client.sharing``).
+
+There is no Django, DRF, ``request`` object, or Thrift here. Every function
+takes an :class:`~airavata_sdk.client.AiravataClient` (named ``client``) plus
+explicit keyword arguments. ``experiment`` arguments are proto
+``ExperimentModel`` instances (snake_case field names).
+"""
+
+import logging
+import os
+from urllib.parse import unquote, urlparse
+
+# Importing ``airavata_sdk.generated`` puts the generated proto root on 
sys.path
+# so the absolute ``from org.apache.airavata...`` imports inside the stubs 
resolve.
+import airavata_sdk.generated  # noqa: F401
+from airavata_sdk.generated.org.apache.airavata.model.application.io import (
+    application_io_pb2,
+)
+from airavata_sdk.generated.org.apache.airavata.model.data.replica import (
+    replica_catalog_pb2,
+)
+from airavata_sdk.generated.org.apache.airavata.model.status import status_pb2
+from airavata_sdk.generated.org.apache.airavata.model.task import task_pb2
+
+logger = logging.getLogger(__name__)
+
+# Directory (relative to the user's storage root) where freshly uploaded input
+# files land before they are moved into a launched experiment's data directory.
+TMP_INPUT_FILE_UPLOAD_DIR = "tmp"
+
+# Permission name understood by the sharing service (it does
+# ResourcePermissionType.valueOf(permission_type) server-side).
+_WRITE_PERMISSION = "WRITE"
+
+# Terminal process states for intermediate-output fetching.
+_TERMINAL_PROCESS_STATES = (
+    status_pb2.PROCESS_STATE_CANCELED,
+    status_pb2.PROCESS_STATE_COMPLETED,
+    status_pb2.PROCESS_STATE_FAILED,
+)
+
+
+# ---------------------------------------------------------------------------
+# Replica / data-product helpers (pure, no backend calls)
+# ---------------------------------------------------------------------------
+
+def _gateway_data_store_replica(data_product):
+    """Return the GATEWAY_DATA_STORE replica of a data product, or None."""
+    replicas = [
+        rep
+        for rep in data_product.replica_locations
+        if rep.replica_location_category == 
replica_catalog_pb2.GATEWAY_DATA_STORE
+    ]
+    if replicas:
+        return replicas[0]
+    # Fall back to the first replica if none is explicitly GATEWAY_DATA_STORE.
+    return data_product.replica_locations[0] if data_product.replica_locations 
else None
+
+
+def _replica_filesystem_path(replica):
+    """Return the plain filesystem path encoded in a replica's file_path.
+
+    Mirrors the old portal's ``_get_replica_filepath``: replica file paths may 
be
+    stored as ``file://<host>:<path>`` (or URL-encoded), so parse out and 
unquote
+    just the path component.
+    """
+    if replica is None or not replica.file_path:
+        return None
+    return unquote(urlparse(replica.file_path).path)
+
+
+def is_input_file(data_product) -> bool:
+    """Return True if the data product is a tmp input-file upload.
+
+    A data product is a tmp input-file upload iff its (gateway-data-store)
+    replica's file_path lives directly under the ``tmp`` directory
+    (TMP_INPUT_FILE_UPLOAD_DIR). Derived entirely from the DataProductModel 
proto
+    — no backend call (the old portal hit the storage backend; the proto 
already
+    carries the replica file path).
+    """
+    replica = _gateway_data_store_replica(data_product)
+    path = _replica_filesystem_path(replica)
+    if path is None:
+        return False
+    return os.path.basename(os.path.dirname(path)) == TMP_INPUT_FILE_UPLOAD_DIR
+
+
+# ---------------------------------------------------------------------------
+# Launch
+# ---------------------------------------------------------------------------
+
+def launch(client, experiment_id, *, username) -> None:
+    """Launch an experiment.
+
+    Equivalent to the old ``experiment_util.api.launch`` non-remote-API branch:
+    set storage ids + per-experiment data dir, move any tmp input-file uploads
+    into that data dir, persist the experiment, then launch it.
+    """
+    experiment = client.research.get_experiment(experiment_id)
+    _set_storage_id_and_data_dir(client, experiment, username=username)
+    _move_tmp_input_file_uploads_to_data_dir(client, experiment)
+    client.research.update_experiment(experiment_id, experiment)
+    client.research.launch_experiment(experiment_id, 
gateway_id=client.gateway_id)
+
+
+def _set_storage_id_and_data_dir(client, experiment, *, username):
+    """Set input/output storage resource ids and the per-experiment data dir.
+
+    Defaults both input and output storage to the gateway default storage
+    resource, then ensures ``experiment_data_dir`` is set, creating the 
directory
+    on storage when necessary.
+    """
+    default_storage_id = client.storage.get_default_storage_resource_id()
+
+    user_config = experiment.user_configuration_data
+    user_config.input_storage_resource_id = default_storage_id
+    user_config.output_storage_resource_id = default_storage_id
+
+    if not user_config.experiment_data_dir:
+        project = client.research.get_project(experiment.project_id)
+        # Path convention mirrors the old 
create_user_dir(dir_names=(project.name,
+        # experiment_name)): "<project>/<experiment>" under the storage root.
+        # Names are sanitized for filesystem safety (spaces -> underscores).
+        exp_dir = "/".join(
+            _sanitize_path_component(part)
+            for part in (project.name, experiment.experiment_name)
+        )
+        # TODO(sdk-consolidation): the old create_user_dir(create_unique=True)
+        # guaranteed a non-colliding directory by suffixing on collision. The
+        # storage facade's create_dir has no uniqueness guarantee, so two
+        # experiments with the same project+name share a data dir. A
+        # create_unique option on the storage facade would restore old 
behavior.
+        resp = client.storage.create_dir(exp_dir, 
storage_resource_id=default_storage_id)
+        # create_dir returns the created path; fall back to the requested path.
+        created = getattr(resp, "created_path", "") or exp_dir
+        user_config.experiment_data_dir = created
+    else:
+        # Ensure the directory exists on storage (the old code re-validated and
+        # created the absolute path via create_user_dir).
+        client.storage.create_dir(
+            user_config.experiment_data_dir, 
storage_resource_id=default_storage_id
+        )
+
+
+def _sanitize_path_component(name):
+    """Sanitize a single path component for use in a storage path."""
+    return (name or "").strip().replace(" ", "_")
+
+
+def _move_tmp_input_file_uploads_to_data_dir(client, experiment):
+    """Move any tmp input-file uploads into the experiment data dir.
+
+    Walks the experiment's URI / URI_COLLECTION inputs and relocates any data
+    product that is a tmp input-file upload, rewriting the input value to the
+    (possibly unchanged) data-product URI.
+    """
+    exp_data_dir = experiment.user_configuration_data.experiment_data_dir
+    storage_id = experiment.user_configuration_data.input_storage_resource_id 
or None
+    for experiment_input in experiment.experiment_inputs:
+        if experiment_input.type == application_io_pb2.URI:
+            if experiment_input.value:
+                experiment_input.value = _move_if_tmp_input_file_upload(
+                    client, experiment_input.value, exp_data_dir, storage_id
+                )
+        elif experiment_input.type == application_io_pb2.URI_COLLECTION:
+            data_product_uris = (
+                experiment_input.value.split(",") if experiment_input.value 
else []
+            )
+            moved_uris = [
+                _move_if_tmp_input_file_upload(client, uri, exp_data_dir, 
storage_id)
+                for uri in data_product_uris
+            ]
+            experiment_input.value = ",".join(moved_uris)
+
+
+def _move_if_tmp_input_file_upload(client, data_product_uri, 
experiment_data_dir, storage_id):
+    """Move a tmp input-file upload into the data dir; return its (same) URI.
+
+    Reads the data product, and if it is a tmp upload, relocates the underlying
+    bytes and repoints its replica's file_path in place — preserving the
+    data-product URI (unlike the old portal, which created a copy + new URI; 
the
+    gRPC facade lets us keep the same product).
+    """
+    data_product = client.research.get_data_product(data_product_uri)
+    if not is_input_file(data_product):
+        return data_product_uri
+
+    replica = _gateway_data_store_replica(data_product)
+    source_path = _replica_filesystem_path(replica)
+    if source_path is None:
+        logger.warning("No replica file path for data product %s; skipping 
move", data_product_uri)
+        return data_product_uri
+
+    file_name = data_product.product_name or os.path.basename(source_path)
+    dest_path = _join_storage_path(experiment_data_dir, file_name)
+
+    client.storage.move_file(source_path, dest_path, 
storage_resource_id=storage_id)
+
+    # Repoint the replica's file_path to the new location and persist it.
+    if replica.replica_id:
+        replica.file_path = dest_path
+        # TODO(sdk-consolidation): update_replica_location persists the new
+        # file_path against the existing replica_id, preserving the 
data-product
+        # URI. Verify round-trip against a live backend during portal
+        # integration (research-service must honor a bare path as file_path).
+        client.research.update_replica_location(replica.replica_id, replica)
+    else:
+        logger.warning(
+            "Replica for data product %s has no replica_id; "
+            "moved bytes but could not repoint replica file_path",
+            data_product_uri,
+        )
+    return data_product_uri
+
+
+def _join_storage_path(directory, name):
+    """Join a storage directory and a file name with a single separator."""
+    if not directory:
+        return name
+    return directory.rstrip("/") + "/" + name
+
+
+# ---------------------------------------------------------------------------
+# Clone
+# ---------------------------------------------------------------------------
+
+def clone(client, experiment_id, *, username, project_id=None) -> str:
+    """Clone an experiment and return the cloned experiment's id.
+
+    Picks a writeable target project (the given ``project_id`` if provided and
+    writeable, else the source experiment's project if writeable, else the 
first
+    writeable project), clones the experiment, copies its input files into 
fresh
+    tmp uploads, nulls out the data dir, and persists.
+    """
+    experiment = client.research.get_experiment(experiment_id)
+    target_project_id = project_id or _get_writeable_project(client, 
experiment, username=username)
+
+    cloned_experiment_id = client.research.clone_experiment(
+        experiment_id,
+        new_experiment_name="Clone of {}".format(experiment.experiment_name),
+        new_experiment_project_id=target_project_id,
+    )
+    cloned_experiment = client.research.get_experiment(cloned_experiment_id)
+
+    # Create a copy of the experiment input files.
+    _copy_cloned_experiment_input_uris(client, cloned_experiment, 
username=username)
+
+    # Null out experiment_data_dir so a new one is created at launch time.
+    cloned_experiment.user_configuration_data.experiment_data_dir = ""
+    client.research.update_experiment(cloned_experiment.experiment_id, 
cloned_experiment)
+    return cloned_experiment_id
+
+
+def _get_writeable_project(client, experiment, *, username):
+    """Return a project id the user can write to.
+
+    Preference order: the experiment's own project (if writeable), else the
+    first writeable project among the user's projects.
+    """
+    project_id = experiment.project_id
+    if _can_write(client, project_id, username=username):
+        return project_id
+    user_projects = client.research.get_user_projects(
+        client.gateway_id, username, limit=-1, offset=0
+    )
+    for user_project in user_projects:
+        if _can_write(client, user_project.project_id, username=username):
+            return user_project.project_id
+    raise Exception(
+        "Could not find writeable project for user {} in gateway {}".format(
+            username, client.gateway_id
+        )
+    )
+
+
+def _can_write(client, entity_id, *, username) -> bool:
+    """Return True if the calling user has WRITE access to the entity.
+
+    The sharing service evaluates access for the authenticated request user; 
the
+    ``username`` is passed through as the request's user_id for completeness.
+    """
+    return client.sharing.user_has_access(entity_id, username, 
_WRITE_PERMISSION)
+
+
+def _copy_cloned_experiment_input_uris(client, cloned_experiment, *, username):
+    """Copy URI / URI_COLLECTION input files for a freshly cloned experiment.
+
+    Each referenced data product is copied into a fresh tmp upload and the 
input
+    value is rewritten to the new data-product URI. Inputs whose source file is
+    missing are dropped (URI) or omitted (URI_COLLECTION), matching the old
+    portal.
+    """
+    for experiment_input in cloned_experiment.experiment_inputs:
+        if not experiment_input.value:
+            continue
+        if experiment_input.type == application_io_pb2.URI:
+            cloned_uri = _copy_experiment_input_uri(client, 
experiment_input.value, username=username)
+            if cloned_uri is None:
+                logger.warning("Setting cloned input %s to null", 
experiment_input.name)
+                experiment_input.value = ""
+            else:
+                experiment_input.value = cloned_uri
+        elif experiment_input.type == application_io_pb2.URI_COLLECTION:
+            data_product_uris = experiment_input.value.split(",") if 
experiment_input.value else []
+            cloned_uris = []
+            for uri in data_product_uris:
+                cloned_uri = _copy_experiment_input_uri(client, uri, 
username=username)
+                if cloned_uri is None:
+                    logger.warning("Omitting a cloned input value for %s", 
experiment_input.name)
+                else:
+                    cloned_uris.append(cloned_uri)
+            experiment_input.value = ",".join(cloned_uris)
+
+
+def _copy_experiment_input_uri(client, data_product_uri, *, username):
+    """Copy the file behind a data product into a fresh tmp upload.
+
+    Returns the new data-product URI, or None if the source file does not 
exist.
+
+    The storage facade has no copy primitive, so this downloads the source 
bytes
+    and re-uploads them to a fresh ``tmp`` path, then registers a new data
+    product pointing at the uploaded copy (the old portal's 
``_copy_input_file``
+    -> ``_save_copy_of_data_product``).
+    """
+    source_product = client.research.get_data_product(data_product_uri)
+    replica = _gateway_data_store_replica(source_product)
+    source_path = _replica_filesystem_path(replica)
+    storage_id = replica.storage_resource_id if replica is not None else None
+
+    if source_path is None or not client.storage.file_exists(
+        source_path, storage_resource_id=storage_id or None
+    ):
+        logger.warning("Could not find file for source data product %s", 
data_product_uri)
+        return None
+
+    download = client.storage.download_file(source_path, 
storage_resource_id=storage_id or None)
+    file_name = source_product.product_name or os.path.basename(source_path)
+    dest_path = _join_storage_path(TMP_INPUT_FILE_UPLOAD_DIR, file_name)
+
+    content_type = source_product.product_metadata.get("mime-type", "")
+    client.storage.upload_file(
+        dest_path,
+        download.content,
+        file_name,
+        storage_resource_id=storage_id or None,
+        content_type=content_type,
+    )
+
+    # Register a new data product for the uploaded copy (the storage facade's
+    # upload returns a minimal DataProductModel without a URI, so register it
+    # explicitly to obtain a persistent URI).
+    new_product = _build_copy_data_product(
+        client, source_product, dest_path, file_name, storage_id, username
+    )
+    new_uri = client.research.register_data_product(new_product)
+    return new_uri
+
+
+def _build_copy_data_product(client, source_product, dest_path, file_name, 
storage_id, username):
+    """Build an unsaved DataProductModel for a copied input file."""
+    replica = replica_catalog_pb2.DataReplicaLocationModel(
+        storage_resource_id=storage_id or "",
+        replica_name="{} gateway data store copy".format(file_name),
+        replica_location_category=replica_catalog_pb2.GATEWAY_DATA_STORE,
+        replica_persistent_type=replica_catalog_pb2.TRANSIENT,
+        file_path=dest_path,
+    )
+    metadata = dict(source_product.product_metadata)
+    return replica_catalog_pb2.DataProductModel(
+        gateway_id=client.gateway_id,
+        owner_name=username,
+        product_name=file_name,
+        data_product_type=replica_catalog_pb2.FILE,
+        product_metadata=metadata,
+        replica_locations=[replica],
+    )
+
+
+# ---------------------------------------------------------------------------
+# Intermediate output
+# ---------------------------------------------------------------------------
+
+def _get_output_fetching_processes(experiment):
+    """Most-recent-first list of the experiment's output-fetching processes."""
+    processes = (
+        sorted(experiment.processes, key=lambda p: p.creation_time, 
reverse=True)
+        if experiment.processes
+        else []
+    )
+    return [
+        process
+        for process in processes
+        if any(task.task_type == task_pb2.OUTPUT_FETCHING for task in 
process.tasks)
+    ]
+
+
+def get_intermediate_output_process_status(client, experiment, *output_names):
+    """Return the ProcessStatus of the intermediate-output fetch process, or 
None.
+
+    Returns None when the experiment has no output-fetching processes or the
+    backend call fails.
+
+    NOTE: the gRPC facade's ``get_intermediate_output_process_status`` takes 
only
+    the experiment id (no per-output filtering), unlike the old Thrift call 
which
+    passed the output names. ``output_names`` is accepted for signature
+    compatibility but not forwarded.
+    """
+    output_fetching_processes = _get_output_fetching_processes(experiment)
+    if not output_fetching_processes:
+        return None
+    try:
+        # TODO(sdk-consolidation): research facade
+        # get_intermediate_output_process_status(experiment_id) does not accept
+        # output names; the old Thrift API filtered by output name. If
+        # per-output status is required, the proto
+        # GetIntermediateOutputProcessStatusRequest needs an output_names 
field.
+        return 
client.research.get_intermediate_output_process_status(experiment.experiment_id)
+    except Exception:
+        logger.debug("Failed to get intermediate output process status", 
exc_info=True)
+        return None
+
+
+def can_fetch_intermediate_output(client, experiment, output_name) -> bool:
+    """Return True if intermediate output can be fetched for the named output.
+
+    True only when at least one job is currently ACTIVE and there is no
+    in-progress (non-terminal) intermediate-output process.
+    """
+    jobs = []
+    for process in experiment.processes:
+        for task in process.tasks:
+            for job in task.jobs:
+                jobs.append(job)
+
+    def latest_status_is_active(job):
+        if not job.job_statuses:
+            return False
+        return job.job_statuses[-1].job_state == status_pb2.ACTIVE
+
+    if not any(latest_status_is_active(job) for job in jobs):
+        return False
+
+    try:
+        process_status = get_intermediate_output_process_status(client, 
experiment, output_name)
+        # If there's no running process, status is None -> treat as fetchable.
+        if process_status is None:
+            return True
+        return process_status.state in _TERMINAL_PROCESS_STATES
+    except Exception:
+        # An error here likely means there is no currently running process.
+        return True
+
+
+def fetch_intermediate_output(client, experiment_id, *output_names):
+    """Start a fetch of the output file(s) for a currently running 
experiment."""
+    return client.research.get_intermediate_outputs(
+        experiment_id, output_names=list(output_names)
+    )
+
+
+def get_intermediate_output_data_products(client, experiment, output_name):
+    """Return the DataProduct instance(s) for the named intermediate output.
+
+    Looks at the most-recent completed output-fetching process, finds the
+    matching process output, and resolves its (comma-separated) data-product
+    URIs.
+    """
+    output_fetching_processes = _get_output_fetching_processes(experiment)
+
+    data_products = []
+    if not output_fetching_processes:
+        return data_products
+
+    most_recent_completed_output = None
+    for process in output_fetching_processes:
+        # Skip processes that aren't completed.
+        if (
+            not process.process_statuses
+            or process.process_statuses[-1].state != 
status_pb2.PROCESS_STATE_COMPLETED
+        ):
+            continue
+        for process_output in process.process_outputs:
+            if process_output.name == output_name:
+                most_recent_completed_output = process_output
+                break
+        if most_recent_completed_output is not None:
+            break
+
+    if most_recent_completed_output is not None:
+        data_product_uris = []
+        if most_recent_completed_output.value.startswith("airavata-dp://"):
+            data_product_uris = most_recent_completed_output.value.split(",")
+        for uri in data_product_uris:
+            data_products.append(client.research.get_data_product(uri))
+    return data_products
diff --git a/airavata-python-sdk/airavata_sdk/helpers/queue_settings.py 
b/airavata-python-sdk/airavata_sdk/helpers/queue_settings.py
new file mode 100644
index 0000000000..e6e42915e2
--- /dev/null
+++ b/airavata-python-sdk/airavata_sdk/helpers/queue_settings.py
@@ -0,0 +1,76 @@
+"""Queue-settings calculator registry.
+
+Relocated verbatim (in behavior) from
+``airavata_django_portal_sdk.queue_settings_calculators`` plus the
+``queue_settings_calculator`` decorator from ``decorators.py``. This is a pure
+in-process registry with no framework dependencies.
+
+A gateway registers calculator functions with the
+:func:`queue_settings_calculator` decorator, then invokes them by id via
+:func:`calculate_queue_settings`. The registered callable receives whatever
+positional/keyword arguments the caller passes through, so an existing portal
+caller can keep invoking ``calculate_queue_settings(id, request, 
experiment_model)``
+unchanged.
+"""
+
+from typing import Callable, NamedTuple
+
+QUEUE_SETTINGS_CALCULATORS = []
+
+
+class QueueSettingsCalculator(NamedTuple):
+    id: str
+    name: str
+    func: Callable
+
+
+def queue_settings_calculator(_func=None, *, id=None, name=None, **kwargs):
+    """Decorator for registering queue settings calculator functions."""
+    def decorator(func):
+        # Register decorator
+        name_ = name
+        if name_ is None:
+            name_ = func.__name__
+        id_ = id
+        if id_ is None:
+            id_ = func.__module__ + ":" + func.__name__
+        if exists(id_):
+            raise Exception(f"Duplicate queue settings calculator id: {id_}")
+        QUEUE_SETTINGS_CALCULATORS.append(QueueSettingsCalculator(id_, name_, 
func))
+        return func
+    if _func is None:
+        return decorator
+    else:
+        return decorator(_func)
+
+
+def calculate_queue_settings(calculator_id, *args, **kwargs):
+    """Invoke a queue settings calculator by id.
+
+    Any positional and keyword arguments are forwarded verbatim to the
+    registered calculator function.
+    """
+    calcs = [calc for calc in QUEUE_SETTINGS_CALCULATORS if calc.id == 
calculator_id]
+    if len(calcs) == 0:
+        raise LookupError(f"Could not find queue settings calculator for 
{calculator_id}")
+    calc = calcs[0]
+    try:
+        return calc.func(*args, **kwargs)
+    except Exception as e:
+        raise Exception(f"Failed to calculate queue settings for 
{calculator_id}") from e
+
+
+def get_all():
+    """Return a list of all registered queue settings calculators."""
+    return QUEUE_SETTINGS_CALCULATORS.copy()
+
+
+def exists(calculator_id):
+    calcs = [calc for calc in QUEUE_SETTINGS_CALCULATORS if calc.id == 
calculator_id]
+    return len(calcs) == 1
+
+
+def reset_registry():
+    """Reset registry, used for testing."""
+    global QUEUE_SETTINGS_CALCULATORS
+    QUEUE_SETTINGS_CALCULATORS = []

Reply via email to