This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 48e800e445 fix: scope large binary storage and cleanup by execution id
(#5280)
48e800e445 is described below
commit 48e800e4458a74cc7bac5c41d8ac3e5eb0e2c7ea
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Sun Jun 7 21:43:57 2026 -0700
fix: scope large binary storage and cleanup by execution id (#5280)
### What changes were proposed in this PR?
Large binaries were stored in the shared `texera-large-binaries` bucket
under flat keys `objects/{timestamp}/{uuid}` with no execution id, and
`clearExecutionResources(eid)` deleted all of them via
`LargeBinaryManager.deleteAllObjects()`. Any cleanup for one execution
therefore erased every other execution's (and user's) large binaries.
This PR namespaces every large binary by its execution id and scopes
deletion:
- Object keys are now `objects/{eid}/{uuid}` on both the JVM and Python
workers.
- The execution-scoped location is named by the controller and handed to
workers as data on `WorkerConfig` — no protobuf change. The controller
computes the base URI `s3://{bucket}/objects/{eid}/`, and `create()`
appends a unique suffix; the JVM seeds the base URI onto the
data-processing thread at startup, and the Python worker receives it as
a startup argument. The user-facing `largebinary()` / `new
LargeBinary()` APIs are unchanged.
- Cleanup uses the new `LargeBinaryManager.deleteByExecution(eid)`
(prefix delete of `objects/{eid}/`). Both JVM and Python engines share
the bucket and key shape, so this single JVM-side delete removes
binaries created by both.
- The `deleteAllObjects()` is removed.
Pre-existing objects under the old `objects/{timestamp}/...` scheme are
left untouched.
### Any related issues, documentation, discussions?
Closes #4123.
### How was this PR tested?
Import the following json file to create two workflows (You can
configure the source operator to use any kinds of files you have), run
them, and check if each execution creates 6 objects and one execution
doesn't remove the other execution's large binary objects.
[Large.Binary.Python
(1).json](https://github.com/user-attachments/files/28369502/Large.Binary.Python.1.json)
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Anthropic), models Claude Opus 4.8, Claude
Opus 4.7, and Claude Sonnet 4.6
---------
Signed-off-by: Kunwoo (Chris) <[email protected]>
Co-authored-by: Xiaozhen Liu <[email protected]>
---
.../main/python/core/models/type/large_binary.py | 4 +-
.../src/main/python/core/storage/storage_config.py | 5 +
.../pytexera/storage/large_binary_input_stream.py | 4 +-
.../pytexera/storage/large_binary_manager.py | 110 ++++++++++-------
.../pytexera/storage/large_binary_output_stream.py | 7 +-
amber/src/main/python/texera_run_python_worker.py | 2 +
.../pythonworker/PythonWorkflowWorker.scala | 3 +-
.../scheduling/config/WorkerConfig.scala | 9 +-
.../engine/architecture/worker/DPThread.scala | 10 +-
.../architecture/worker/WorkflowWorker.scala | 8 +-
.../dashboard/user/workflow/WorkflowResource.scala | 6 +-
.../texera/web/service/WorkflowService.scala | 6 +-
.../packaging/test_state_materialization_e2e.py | 1 +
.../python/core/models/type/test_large_binary.py | 6 +-
.../core/storage/iceberg/test_iceberg_document.py | 1 +
.../storage/test_large_binary_input_stream.py | 5 +-
.../pytexera/storage/test_large_binary_manager.py | 133 ++++++++++++---------
.../storage/test_large_binary_output_stream.py | 5 +-
.../scheduling/config/SchedulingConfigsSpec.scala | 19 ++-
.../engine/architecture/worker/DPThreadSpec.scala | 32 ++++-
common/config/src/main/resources/storage.conf | 4 +
.../apache/texera/amber/config/StorageConfig.scala | 1 +
.../texera/service/util/LargeBinaryManager.scala | 86 +++++++++----
.../service/util/LargeBinaryManagerSpec.scala | 115 +++++++++++++-----
.../service/util/LargeBinaryManagerUnitSpec.scala | 72 +++++++++++
25 files changed, 482 insertions(+), 172 deletions(-)
diff --git a/amber/src/main/python/core/models/type/large_binary.py
b/amber/src/main/python/core/models/type/large_binary.py
index 581a688912..34110f374e 100644
--- a/amber/src/main/python/core/models/type/large_binary.py
+++ b/amber/src/main/python/core/models/type/large_binary.py
@@ -63,9 +63,9 @@ class largebinary:
"""
if uri is None:
# Lazy import to avoid circular dependencies
- from pytexera.storage import large_binary_manager
+ from pytexera.storage.large_binary_manager import
LargeBinaryManager
- uri = large_binary_manager.create()
+ uri = LargeBinaryManager().create()
if not uri.startswith("s3://"):
raise ValueError(f"largebinary URI must start with 's3://', got:
{uri}")
diff --git a/amber/src/main/python/core/storage/storage_config.py
b/amber/src/main/python/core/storage/storage_config.py
index 8233590987..05e8c3ee16 100644
--- a/amber/src/main/python/core/storage/storage_config.py
+++ b/amber/src/main/python/core/storage/storage_config.py
@@ -41,6 +41,9 @@ class StorageConfig:
S3_REGION = None
S3_AUTH_USERNAME = None
S3_AUTH_PASSWORD = None
+ # Execution-scoped base URI (s3://bucket/objects/{eid}/) for this worker's
large
+ # binaries; fixed at process init, which assumes one process per execution.
+ S3_LARGE_BINARIES_BASE_URI = None
@classmethod
def initialize(
@@ -59,6 +62,7 @@ class StorageConfig:
s3_region,
s3_auth_username,
s3_auth_password,
+ s3_large_binaries_base_uri,
):
if cls._initialized:
raise RuntimeError(
@@ -82,6 +86,7 @@ class StorageConfig:
cls.S3_REGION = s3_region
cls.S3_AUTH_USERNAME = s3_auth_username
cls.S3_AUTH_PASSWORD = s3_auth_password
+ cls.S3_LARGE_BINARIES_BASE_URI = s3_large_binaries_base_uri
cls._initialized = True
diff --git
a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
index 68368c5c12..f17373c449 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
@@ -60,9 +60,9 @@ class LargeBinaryInputStream(IOBase):
def _lazy_init(self):
"""Download from S3 on first read operation."""
- from pytexera.storage import large_binary_manager
+ from pytexera.storage.large_binary_manager import LargeBinaryManager
- s3 = large_binary_manager._get_s3_client()
+ s3 = LargeBinaryManager()._get_s3_client()
response = s3.get_object(
Bucket=self._large_binary.get_bucket_name(),
Key=self._large_binary.get_object_key(),
diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py
b/amber/src/main/python/pytexera/storage/large_binary_manager.py
index e061eac622..b323c676d5 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_manager.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py
@@ -22,57 +22,83 @@ Users should not interact with this module directly. Use
largebinary() construct
and LargeBinaryInputStream/LargeBinaryOutputStream instead.
"""
-import time
+import threading
import uuid
from loguru import logger
from core.storage.storage_config import StorageConfig
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
+class LargeBinaryManager:
+ """Manages large binaries in S3 for a worker process.
-def _get_s3_client():
- """Get or initialize S3 client (lazy initialization, cached)."""
- global _s3_client
- if _s3_client is None:
- try:
- import boto3
- from botocore.config import Config
- except ImportError as e:
- raise RuntimeError("boto3 required. Install with: pip install
boto3") from e
+ A singleton, so the cached S3 client is shared process-wide. create()
appends a
+ unique suffix to an execution-scoped base URI handed down by the
controller as
+ process config (``StorageConfig.S3_LARGE_BINARIES_BASE_URI``); the worker
never
+ holds an execution id. This is the Python counterpart of the JVM
+ ``LargeBinaryManager``, which uses a thread-local instead because one JVM
process
+ runs many workers across executions (a Python worker is one process per
execution).
+ """
- _s3_client = boto3.client(
- "s3",
- endpoint_url=StorageConfig.S3_ENDPOINT,
- aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
- aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
- region_name=StorageConfig.S3_REGION,
- config=Config(signature_version="s3v4", s3={"addressing_style":
"path"}),
- )
- return _s3_client
+ _instance = None
+ # Guards singleton creation and S3-client init; reached from the operator
and upload
+ # threads.
+ _lock = threading.Lock()
+ def __new__(cls):
+ # Double-checked locking: skip the lock once the instance exists.
+ if cls._instance is None:
+ with cls._lock:
+ if cls._instance is None:
+ instance = super().__new__(cls)
+ instance._s3_client = None
+ cls._instance = instance
+ return cls._instance
-def _ensure_bucket_exists(bucket: str):
- """Ensure S3 bucket exists, creating it if necessary."""
- s3 = _get_s3_client()
- try:
- s3.head_bucket(Bucket=bucket)
- except s3.exceptions.NoSuchBucket:
- logger.debug(f"Bucket {bucket} not found, creating it")
- s3.create_bucket(Bucket=bucket)
- logger.info(f"Created bucket: {bucket}")
+ def _get_s3_client(self):
+ """Get or initialize the S3 client (lazy initialization, cached)."""
+ if self._s3_client is None:
+ with self._lock:
+ if self._s3_client is None:
+ try:
+ import boto3
+ from botocore.config import Config
+ except ImportError as e:
+ raise RuntimeError(
+ "boto3 required. Install with: pip install boto3"
+ ) from e
+ self._s3_client = boto3.client(
+ "s3",
+ endpoint_url=StorageConfig.S3_ENDPOINT,
+ aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
+ aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
+ region_name=StorageConfig.S3_REGION,
+ config=Config(
+ signature_version="s3v4", s3={"addressing_style":
"path"}
+ ),
+ )
+ return self._s3_client
-def create() -> str:
- """
- Creates a new largebinary reference with a unique S3 URI.
+ def _ensure_bucket_exists(self, bucket: str):
+ """Ensure the S3 bucket exists, creating it if necessary."""
+ s3 = self._get_s3_client()
+ try:
+ s3.head_bucket(Bucket=bucket)
+ except s3.exceptions.NoSuchBucket:
+ logger.debug(f"Bucket {bucket} not found, creating it")
+ s3.create_bucket(Bucket=bucket)
+ logger.info(f"Created bucket: {bucket}")
- Returns:
- S3 URI string (format: s3://bucket/key)
- """
- _ensure_bucket_exists(DEFAULT_BUCKET)
- timestamp_ms = int(time.time() * 1000)
- unique_id = uuid.uuid4()
- object_key = f"objects/{timestamp_ms}/{unique_id}"
- return f"s3://{DEFAULT_BUCKET}/{object_key}"
+ def create(self) -> str:
+ """Append a unique suffix to the controller-provided base URI.
+
+ Pure string construction (no S3 round-trip); the bucket is created on
demand at
+ upload time. Returns e.g.
``s3://bucket/objects/{execution_id}/{uuid}``.
+ """
+ base_uri = StorageConfig.S3_LARGE_BINARIES_BASE_URI
+ if not base_uri:
+ raise RuntimeError(
+ "largebinary() requires a large-binaries base URI, but none is
"
+ "configured (StorageConfig.S3_LARGE_BINARIES_BASE_URI is
unset)."
+ )
+ return f"{base_uri}{uuid.uuid4()}"
diff --git
a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
index 1ca575dd42..63d4caeb6b 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
@@ -29,7 +29,7 @@ Usage:
from typing import Optional, Union
from io import IOBase
from core.models.type.large_binary import largebinary
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
import threading
import queue
@@ -155,8 +155,9 @@ class LargeBinaryOutputStream(IOBase):
def upload_worker():
s3 = None
try:
-
large_binary_manager._ensure_bucket_exists(self._bucket_name)
- s3 = large_binary_manager._get_s3_client()
+ manager = LargeBinaryManager()
+ manager._ensure_bucket_exists(self._bucket_name)
+ s3 = manager._get_s3_client()
reader = _QueueReader(self._queue)
s3.upload_fileobj(reader, self._bucket_name,
self._object_key)
except Exception as e:
diff --git a/amber/src/main/python/texera_run_python_worker.py
b/amber/src/main/python/texera_run_python_worker.py
index f1ba8c7db6..5c3e25e096 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -70,6 +70,7 @@ if __name__ == "__main__":
s3_region,
s3_auth_username,
s3_auth_password,
+ s3_large_binaries_base_uri,
) = sys.argv
init_loguru_logger(logger_level)
StorageConfig.initialize(
@@ -87,6 +88,7 @@ if __name__ == "__main__":
s3_region,
s3_auth_username,
s3_auth_password,
+ s3_large_binaries_base_uri,
)
# Setting R_HOME environment variable for R-UDF usage
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index b8bb3a92b6..c7ed3b6324 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -209,7 +209,8 @@ class PythonWorkflowWorker(
StorageConfig.s3Endpoint,
StorageConfig.s3Region,
StorageConfig.s3Username,
- StorageConfig.s3Password
+ StorageConfig.s3Password,
+ workerConfig.largeBinaryBaseUri
)
).run(BasicIO.standard(false))
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
index bb8f3d5775..8166dee6dd 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
@@ -23,6 +23,7 @@ import org.apache.texera.amber.config.ApplicationConfig
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.core.workflow.PhysicalOp
import org.apache.texera.amber.util.VirtualIdentityUtils
+import org.apache.texera.service.util.LargeBinaryManager
case object WorkerConfig {
def generateWorkerConfigs(
@@ -45,7 +46,8 @@ case object WorkerConfig {
WorkerConfig(
VirtualIdentityUtils.createWorkerIdentity(physicalOp.workflowId,
physicalOp.id, idx),
pveName = physicalOp.pveName,
- cuid = cuid
+ cuid = cuid,
+ largeBinaryBaseUri =
LargeBinaryManager.baseUriForExecution(physicalOp.executionId.id)
)
)
}
@@ -54,5 +56,8 @@ case object WorkerConfig {
case class WorkerConfig(
workerId: ActorVirtualIdentity,
pveName: String = "",
- cuid: Option[Int] = None
+ cuid: Option[Int] = None,
+ // Controller-named, execution-scoped base URI under which this worker's
large binaries
+ // live; create() appends a unique suffix. Empty when large binaries are
unconfigured.
+ largeBinaryBaseUri: String = ""
)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
index 0c136d613a..fd4d7786c8 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
@@ -39,6 +39,7 @@ import org.apache.texera.amber.engine.common.ambermessage.{
}
import org.apache.texera.amber.engine.common.virtualidentity.util.SELF
import org.apache.texera.amber.error.ErrorUtils.safely
+import org.apache.texera.service.util.LargeBinaryManager
import java.util.concurrent._
@@ -46,7 +47,10 @@ class DPThread(
val actorId: ActorVirtualIdentity,
dp: DataProcessor,
logManager: ReplayLogManager,
- internalQueue: LinkedBlockingQueue[DPInputQueueElement]
+ internalQueue: LinkedBlockingQueue[DPInputQueueElement],
+ // Controller-named, execution-scoped base URI for large binaries created
on this
+ // thread (empty when unconfigured). Seeded into LargeBinaryManager at
thread start.
+ largeBinaryBaseUri: String = ""
) extends AmberLogging {
// initialize dp thread upon construction
@@ -91,6 +95,10 @@ class DPThread(
dpThread = dpThreadExecutor.submit(new Runnable() {
def run(): Unit = {
Thread.currentThread().setName(getThreadName)
+ // Seed this thread's large-binary base URI (from WorkerConfig)
before any tuple,
+ // so create() can append a suffix without the execution id. Once
per thread,
+ // assuming a thread serves one execution.
+ LargeBinaryManager.setCurrentBaseUri(largeBinaryBaseUri)
logger.info("DP thread started")
startFuture.complete(())
dp.statisticsManager.initializeWorkerStartTime(System.nanoTime())
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
index d1a0a300d9..13b1d94b04 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
@@ -107,7 +107,13 @@ class WorkflowWorker(
)
}
// dp is ready
- dpThread = new DPThread(workerConfig.workerId, dp, logManager, inputQueue)
+ dpThread = new DPThread(
+ workerConfig.workerId,
+ dp,
+ logManager,
+ inputQueue,
+ workerConfig.largeBinaryBaseUri
+ )
dpThread.start()
}
diff --git
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
index cb910d11c3..ee559048c9 100644
---
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
+++
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
@@ -611,8 +611,6 @@ class WorkflowResource extends LazyLogging {
.asScala
.toList
- LargeBinaryManager.deleteAllObjects()
-
// Collect all URIs related to executions for cleanup
val uris = eids.flatMap { eid =>
val executionId = ExecutionIdentity(eid.longValue())
@@ -638,6 +636,10 @@ class WorkflowResource extends LazyLogging {
}
}
+ // Delete large binaries for each execution belonging to the workflows
being
+ // removed. Done after the transaction (like the document cleanup below).
+ eids.foreach(eid =>
LargeBinaryManager.deleteByExecution(eid.longValue()))
+
// Clean up document storage
try {
uris.foreach { uri =>
diff --git
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index 0044c95967..c18b8b50e8 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -318,7 +318,7 @@ class WorkflowService(
* 2. Clears URI references from the execution registry
* 3. Safely clears all result and console message documents
* 4. Expires Iceberg snapshots for runtime statistics
- * 5. Deletes large binaries from MinIO
+ * 5. Deletes this execution's large binaries from MinIO
*
* @param eid The execution identity to clean up resources for
*/
@@ -355,7 +355,7 @@ class WorkflowService(
logger.debug(s"Error processing document at $uri:
${error.getMessage}")
}
}
- // Delete large binaries
- LargeBinaryManager.deleteAllObjects()
+ // Delete this execution's large binaries
+ LargeBinaryManager.deleteByExecution(eid.id)
}
}
diff --git
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
index 9d1fd30698..cfc4f7f676 100644
---
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
+++
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
@@ -108,6 +108,7 @@ def sqlite_iceberg_catalog():
s3_region="unused",
s3_auth_username="unused",
s3_auth_password="unused",
+ s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/",
)
original_instance = IcebergCatalogInstance._instance
diff --git a/amber/src/test/python/core/models/type/test_large_binary.py
b/amber/src/test/python/core/models/type/test_large_binary.py
index 36310e1dd5..56348f4265 100644
--- a/amber/src/test/python/core/models/type/test_large_binary.py
+++ b/amber/src/test/python/core/models/type/test_large_binary.py
@@ -18,6 +18,10 @@
import pytest
from unittest.mock import patch
from core.models.type.large_binary import largebinary
+from pytexera.storage.large_binary_manager import LargeBinaryManager
+
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
class TestLargeBinary:
@@ -31,7 +35,7 @@ class TestLargeBinary:
def test_create_without_uri(self):
"""Test creating largebinary without URI (calls
large_binary_manager.create)."""
- with patch("pytexera.storage.large_binary_manager.create") as
mock_create:
+ with patch.object(large_binary_manager, "create") as mock_create:
mock_create.return_value = "s3://bucket/objects/123/uuid"
large_binary = largebinary()
assert large_binary.uri == "s3://bucket/objects/123/uuid"
diff --git
a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
index bdba00ba0c..5e61b4ed68 100644
--- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
@@ -57,6 +57,7 @@ StorageConfig.initialize(
s3_region="us-east-1",
s3_auth_username="minioadmin",
s3_auth_password="minioadmin",
+ s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/",
)
diff --git
a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
index 35bc5bc634..7e6a401aef 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
@@ -20,7 +20,10 @@ from unittest.mock import patch, MagicMock
from io import BytesIO
from core.models.type.large_binary import largebinary
from pytexera.storage.large_binary_input_stream import LargeBinaryInputStream
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
+
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
class TestLargeBinaryInputStream:
diff --git
a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
index 1942e91f8b..e1da5eee18 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
@@ -15,34 +15,44 @@
# specific language governing permissions and limitations
# under the License.
+import re
+
import pytest
from unittest.mock import patch, MagicMock
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
from core.storage.storage_config import StorageConfig
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
+
+# Execution-scoped base URI the controller hands down; create() appends a
unique suffix.
+TEST_BASE_URI = "s3://texera-large-binaries/objects/1/"
+
+
[email protected](autouse=True)
+def _init_storage_config():
+ """Initialize StorageConfig (incl. the large-binaries base URI) for every
test."""
+ if not StorageConfig._initialized:
+ StorageConfig.initialize(
+ catalog_type="postgres",
+ postgres_uri_without_scheme="localhost:5432/test",
+ postgres_username="test",
+ postgres_password="test",
+ rest_catalog_uri="http://localhost:8181/catalog/",
+ rest_catalog_warehouse_name="texera",
+ table_result_namespace="test",
+ table_state_namespace="test-state",
+ directory_path="/tmp/test",
+ commit_batch_size=1000,
+ s3_endpoint="http://localhost:9000",
+ s3_region="us-east-1",
+ s3_auth_username="minioadmin",
+ s3_auth_password="minioadmin",
+ s3_large_binaries_base_uri=TEST_BASE_URI,
+ )
-class TestLargeBinaryManager:
- @pytest.fixture(autouse=True)
- def setup_storage_config(self):
- """Initialize StorageConfig for tests."""
- if not StorageConfig._initialized:
- StorageConfig.initialize(
- catalog_type="postgres",
- postgres_uri_without_scheme="localhost:5432/test",
- postgres_username="test",
- postgres_password="test",
- rest_catalog_uri="http://localhost:8181/catalog/",
- rest_catalog_warehouse_name="texera",
- table_result_namespace="test",
- table_state_namespace="test-state",
- directory_path="/tmp/test",
- commit_batch_size=1000,
- s3_endpoint="http://localhost:9000",
- s3_region="us-east-1",
- s3_auth_username="minioadmin",
- s3_auth_password="minioadmin",
- )
+class TestLargeBinaryManager:
def test_get_s3_client_initializes_once(self):
"""Test that S3 client is initialized and cached."""
# Reset the client
@@ -118,37 +128,48 @@ class TestLargeBinaryManager:
mock_client.head_bucket.assert_called_once_with(Bucket="test-bucket")
mock_client.create_bucket.assert_called_once_with(Bucket="test-bucket")
- def test_create_generates_unique_uri(self):
- """Test that create() generates a unique S3 URI."""
- large_binary_manager._s3_client = None
-
- with patch("boto3.client") as mock_boto3_client:
- mock_client = MagicMock()
- mock_boto3_client.return_value = mock_client
- mock_client.head_bucket.return_value = None
- mock_client.exceptions.NoSuchBucket = type("NoSuchBucket",
(Exception,), {})
-
- uri = large_binary_manager.create()
-
- # Check URI format
- assert uri.startswith("s3://")
- assert
uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/")
- assert "objects/" in uri
-
- # Verify bucket was checked/created
- mock_client.head_bucket.assert_called_once_with(
- Bucket=large_binary_manager.DEFAULT_BUCKET
- )
-
- def test_create_uses_default_bucket(self):
- """Test that create() uses the default bucket."""
- large_binary_manager._s3_client = None
-
- with patch("boto3.client") as mock_boto3_client:
- mock_client = MagicMock()
- mock_boto3_client.return_value = mock_client
- mock_client.head_bucket.return_value = None
- mock_client.exceptions.NoSuchBucket = type("NoSuchBucket",
(Exception,), {})
-
- uri = large_binary_manager.create()
- assert large_binary_manager.DEFAULT_BUCKET in uri
+ def test_create_appends_unique_suffix_to_base_uri(self):
+ """create() returns the configured base URI plus a unique suffix (no
S3 call)."""
+ base = StorageConfig.S3_LARGE_BINARIES_BASE_URI
+
+ uri1 = large_binary_manager.create()
+ uri2 = large_binary_manager.create()
+
+ assert uri1.startswith(base)
+ assert uri2.startswith(base)
+ # A non-empty, unique suffix follows the base URI.
+ assert uri1 != base
+ assert uri1 != uri2
+
+
+def test_create_matches_execution_scoped_key_shape(monkeypatch):
+ # The base URI is execution-scoped (controller-named); create() only
appends a uuid.
+ monkeypatch.setattr(
+ StorageConfig,
+ "S3_LARGE_BINARIES_BASE_URI",
+ "s3://texera-large-binaries/objects/42/",
+ )
+ uri = large_binary_manager.create()
+ assert
re.fullmatch(r"s3://texera-large-binaries/objects/42/[0-9a-fA-F-]+", uri)
+
+
+def test_create_without_base_uri_raises(monkeypatch):
+ # An unconfigured base URI should fail with a clear error, not a cryptic
S3 one.
+ monkeypatch.setattr(StorageConfig, "S3_LARGE_BINARIES_BASE_URI", None)
+ with pytest.raises(RuntimeError):
+ large_binary_manager.create()
+
+
+def test_largebinarymanager_is_a_singleton():
+ # Constructing the manager always returns the same shared instance.
+ assert LargeBinaryManager() is LargeBinaryManager()
+
+ # State (the cached S3 client) is shared across handles (same instance).
+ mgr = LargeBinaryManager()
+ original = mgr._s3_client
+ sentinel = object()
+ mgr._s3_client = sentinel
+ try:
+ assert LargeBinaryManager()._s3_client is sentinel
+ finally:
+ mgr._s3_client = original
diff --git
a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
index e35f1400fd..dc8881ab16 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
@@ -24,7 +24,10 @@ from pytexera.storage.large_binary_output_stream import (
LargeBinaryOutputStream,
_QueueReader,
)
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
+
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
class TestLargeBinaryOutputStream:
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
index 25ef739251..31ab5c8e1a 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
@@ -31,6 +31,7 @@ import org.apache.texera.amber.core.virtualidentity.{
}
import org.apache.texera.amber.core.workflow._
import
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
+import org.apache.texera.service.util.LargeBinaryManager
import org.scalatest.flatspec.AnyFlatSpec
import java.net.URI
@@ -273,11 +274,15 @@ class SchedulingConfigsSpec extends AnyFlatSpec {
// WorkerConfig.generateWorkerConfigs
//
---------------------------------------------------------------------------
- private def physicalOp(parallelizable: Boolean, suggested: Option[Int]):
PhysicalOp =
+ private def physicalOp(
+ parallelizable: Boolean,
+ suggested: Option[Int],
+ executionId: ExecutionIdentity = ExecutionIdentity(0)
+ ): PhysicalOp =
PhysicalOp(
PhysicalOpIdentity(OperatorIdentity("op"), "main"),
WorkflowIdentity(0),
- ExecutionIdentity(0),
+ executionId,
OpExecInitInfo.Empty,
parallelizable = parallelizable,
suggestedWorkerNum = suggested
@@ -308,4 +313,14 @@ class SchedulingConfigsSpec extends AnyFlatSpec {
WorkerConfig.generateWorkerConfigs(physicalOp(parallelizable = true,
suggested = None))
assert(configs.size == ApplicationConfig.numWorkerPerOperatorByDefault)
}
+
+ it should "set largeBinaryBaseUri to the execution-scoped base URI for every
worker" in {
+ val eid = ExecutionIdentity(42L)
+ val configs = WorkerConfig.generateWorkerConfigs(
+ physicalOp(parallelizable = true, suggested = Some(3), executionId = eid)
+ )
+ val expected = LargeBinaryManager.baseUriForExecution(eid.id)
+ assert(expected.contains(s"objects/${eid.id}/"))
+ assert(configs.forall(_.largeBinaryBaseUri == expected))
+ }
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
index d8b5d57d63..eb1241182e 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
@@ -42,11 +42,12 @@ import
org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFI
import
org.apache.texera.amber.engine.common.rpc.AsyncRPCClient.ControlInvocation
import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage
import org.apache.texera.amber.engine.common.virtualidentity.util.SELF
+import org.apache.texera.service.util.LargeBinaryManager
import org.scalamock.scalatest.MockFactory
import org.scalatest.flatspec.AnyFlatSpec
import java.net.URI
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{CompletableFuture, LinkedBlockingQueue, TimeUnit}
class DPThreadSpec extends AnyFlatSpec with MockFactory {
@@ -240,4 +241,33 @@ class DPThreadSpec extends AnyFlatSpec with MockFactory {
assert(logs.length > 1)
}
+ "DP Thread" should "seed the base URI so create() yields execution-scoped
keys" in {
+ val eid = 7777L
+ val baseUri = LargeBinaryManager.baseUriForExecution(eid)
+ // create() runs on the DP thread; capture what it produces there.
+ val capturedUri = new CompletableFuture[String]()
+ val inputQueue = new LinkedBlockingQueue[DPInputQueueElement]()
+ val dp = new DataProcessor(workerId, x => {}, inputMessageQueue =
inputQueue)
+ dp.executor = new OperatorExecutor {
+ override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike]
= {
+ capturedUri.complete(LargeBinaryManager.create())
+ Iterator.empty
+ }
+ }
+ dp.inputManager.addPort(mockInputPortId, schema, List.empty, List.empty)
+ dp.inputGateway.getChannel(dataChannelId).setPortId(mockInputPortId)
+ dp.adaptiveBatchingMonitor = mock[WorkerTimerService]
+ (dp.adaptiveBatchingMonitor.resumeAdaptiveBatching
_).expects().anyNumberOfTimes()
+ val dpThread = new DPThread(workerId, dp, logManager, inputQueue, baseUri)
+ dpThread.start()
+ inputQueue.put(
+ FIFOMessageElement(WorkflowFIFOMessage(dataChannelId, 0,
DataFrame(Array(tuples(0)))))
+ )
+
+ val uri = capturedUri.get(5, TimeUnit.SECONDS)
+
assert(uri.startsWith(s"s3://${LargeBinaryManager.DEFAULT_BUCKET}/objects/$eid/"))
+ // a unique suffix is appended to the execution-scoped base URI
+ assert(uri.length > baseUri.length)
+ }
+
}
diff --git a/common/config/src/main/resources/storage.conf
b/common/config/src/main/resources/storage.conf
index 0682109f19..12a9919e04 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -115,6 +115,10 @@ storage {
region = "us-west-2"
region = ${?STORAGE_S3_REGION}
+ # Bucket for large binaries; shared by the JVM and Python workers and
cleanup.
+ large-binaries-bucket = "texera-large-binaries"
+ large-binaries-bucket = ${?STORAGE_S3_LARGE_BINARIES_BUCKET}
+
multipart {
part-size = "16MB"
part-size = ${?STORAGE_S3_MULTIPART_PART_SIZE}
diff --git
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
index 07447cfdbe..c88541cf1b 100644
---
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
+++
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
@@ -80,6 +80,7 @@ object StorageConfig {
val s3Region: String = conf.getString("storage.s3.region")
val s3Username: String = conf.getString("storage.s3.auth.username")
val s3Password: String = conf.getString("storage.s3.auth.password")
+ val s3LargeBinariesBucket: String =
conf.getString("storage.s3.large-binaries-bucket")
val s3MultipartUploadPartSize: Long = parseSizeStringToBytes(
conf.getString("storage.s3.multipart.part-size")
)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
index 44db3929f2..2fa4acb530 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
@@ -20,44 +20,88 @@
package org.apache.texera.service.util
import com.typesafe.scalalogging.LazyLogging
+import org.apache.texera.amber.config.StorageConfig
import java.util.UUID
-/**
- * Manages the lifecycle of LargeBinaries stored in S3.
- *
- * Handles creation and deletion of large objects that exceed
- * normal tuple size limits.
- */
+/** Manages the lifecycle of LargeBinaries (objects too large for normal
tuples) in S3. */
object LargeBinaryManager extends LazyLogging {
- val DEFAULT_BUCKET: String = "texera-large-binaries"
+ // From config so the JVM/Python workers and cleanup all share one bucket.
+ val DEFAULT_BUCKET: String = StorageConfig.s3LargeBinariesBucket
+
+ /** Per-execution key prefix; the single source of truth for the write and
delete paths. */
+ private def executionPrefix(executionId: Long): String =
s"objects/$executionId"
/**
- * Creates a new LargeBinary reference.
- * The actual data upload happens separately via LargeBinaryOutputStream.
+ * Base URI (trailing slash) under which `executionId`'s large binaries
live; create()
+ * appends a unique suffix. Empty when the bucket is unconfigured, so
create() fails loudly.
*
- * @return S3 URI string for the new LargeBinary (format: s3://bucket/key)
+ * `executionId` must be a persisted EID. The sentinel id
(DEFAULT_EXECUTION_ID = 1) shares
+ * this space, so binaries must only be created under a real execution —
else execution 1
+ * and a default-context run would collide under objects/1/.
*/
- def create(): String = {
- val objectKey =
s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}"
- val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
+ def baseUriForExecution(executionId: Long): String =
+ if (DEFAULT_BUCKET.isEmpty) ""
+ else s"s3://$DEFAULT_BUCKET/${executionPrefix(executionId)}/"
+
+ /**
+ * Base URI for binaries created on the current thread — thread-local
because create()
+ * runs on the DP thread. Seeded once at DP-thread start, assuming a thread
serves one
+ * execution; re-seed if workers are ever reused across executions.
+ */
+ private val currentBaseUri: ThreadLocal[Option[String]] =
+ ThreadLocal.withInitial(() => Option.empty[String])
+
+ /** Sets the current thread's base URI; an empty value clears it, so
create() fails loudly. */
+ def setCurrentBaseUri(baseUri: String): Unit =
+ currentBaseUri.set(Option(baseUri).filter(_.nonEmpty))
- uri
+ /**
+ * Creates a LargeBinary reference by appending a unique suffix to the
current thread's
+ * base URI. Data is uploaded separately via LargeBinaryOutputStream.
+ *
+ * @return e.g. s3://bucket/objects/{eid}/{uuid}
+ */
+ def create(): String = {
+ val baseUri = currentBaseUri
+ .get()
+ .getOrElse(
+ throw new IllegalStateException(
+ "LargeBinaryManager.create() requires a base URI, " +
+ "but none was set on the current thread."
+ )
+ )
+ s"$baseUri${UUID.randomUUID()}"
}
/**
- * Deletes all large binaries from the bucket.
+ * Deletes all large binaries for one execution. Uses deleteDirectory,
which removes one
+ * listing page (<= 1000 objects) — enough for expected counts; more needs
a paginated delete.
*
- * @throws java.lang.Exception if the deletion fails
- * @return Unit
+ * @param executionId the execution whose large binaries should be removed
*/
- def deleteAllObjects(): Unit = {
+ def deleteByExecution(executionId: Long): Unit =
+ deleteByExecution(executionId, S3StorageClient.deleteDirectory)
+
+ /** Overload taking the delete op as a parameter. Visible for testing. */
+ private[util] def deleteByExecution(
+ executionId: Long,
+ deleteDir: (String, String) => Unit
+ ): Unit = {
try {
- S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects")
- logger.info(s"Successfully deleted all large binaries from bucket:
$DEFAULT_BUCKET")
+ deleteDir(DEFAULT_BUCKET, executionPrefix(executionId))
+ logger.info(
+ s"Deleted large binaries for execution $executionId from bucket:
$DEFAULT_BUCKET"
+ )
} catch {
+ // Swallowed: cleanup is a side effect of deletion and must not fail it.
Logged at
+ // error because a failure here silently leaks storage.
case e: Exception =>
- logger.warn(s"Failed to delete large binaries from bucket:
$DEFAULT_BUCKET", e)
+ logger.error(
+ s"Failed to delete large binaries for execution $executionId " +
+ s"from bucket: $DEFAULT_BUCKET",
+ e
+ )
}
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
index 77d142efee..a0436e682e 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
@@ -21,8 +21,22 @@ package org.apache.texera.service.util
import org.apache.texera.amber.core.tuple.LargeBinary
import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.BeforeAndAfterEach
-class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase {
+class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase with
BeforeAndAfterEach {
+
+ /** Execution id used by the bulk of the tests. */
+ private val TestExecutionId: Long = 9999L
+
+ /** Seeds the thread's base URI for an execution, as the controller does in
production. */
+ private def setExecutionContext(eid: Long): Unit =
+
LargeBinaryManager.setCurrentBaseUri(LargeBinaryManager.baseUriForExecution(eid))
+
+ /** Each test creates large binaries; they need a base URI on the thread. */
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ setExecutionContext(TestExecutionId)
+ }
/** Creates a large binary from string data and returns it. */
private def createLargeBinary(data: String): LargeBinary = {
@@ -54,7 +68,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(stream.readAllBytes().sameElements(data.getBytes))
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should read exact number of bytes") {
@@ -67,7 +81,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(result.sameElements("0123456789".getBytes))
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should handle reading more bytes than
available") {
@@ -81,7 +95,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(result.sameElements(data.getBytes))
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should support standard single-byte read") {
@@ -94,7 +108,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(stream.read() == -1) // EOF
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should return -1 at EOF") {
@@ -105,7 +119,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(stream.read() == -1)
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should throw exception when reading from closed
stream") {
@@ -117,7 +131,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assertThrows[java.io.IOException](stream.read())
assertThrows[java.io.IOException](stream.readAllBytes())
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should handle multiple close calls") {
@@ -127,7 +141,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
stream.close()
stream.close() // Should not throw
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should read large data correctly") {
@@ -145,7 +159,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(result.sameElements(largeData))
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
// ========================================
@@ -183,7 +197,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
}
}
- test("LargeBinaryManager should delete all large binaries") {
+ test("deleteByExecution removes binaries written via new LargeBinary()") {
val pointer1 = new LargeBinary()
val out1 = new LargeBinaryOutputStream(pointer1)
try {
@@ -200,18 +214,18 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
out2.close()
}
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryManager should handle delete with no objects gracefully") {
- LargeBinaryManager.deleteAllObjects() // Should not throw exception
+ LargeBinaryManager.deleteByExecution(TestExecutionId) // Should not throw
exception
}
- test("LargeBinaryManager should delete all objects") {
+ test("deleteByExecution removes multiple binaries for the execution") {
val pointer1 = createLargeBinary("Test data")
val pointer2 = createLargeBinary("Test data")
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryManager should create bucket if it doesn't exist") {
@@ -219,7 +233,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assertStandardBucket(pointer)
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryManager should handle large objects correctly") {
@@ -237,7 +251,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
stream.close()
assert(readData.sameElements(largeData))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryManager should generate unique URIs for different objects")
{
@@ -261,7 +275,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(pointer1.getUri != pointer2.getUri)
assert(pointer1.getObjectKey != pointer2.getObjectKey)
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should handle multiple reads from the same
large binary") {
@@ -279,7 +293,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData1.sameElements(data.getBytes))
assert(readData2.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryManager should properly parse bucket name and object key
from large binary") {
@@ -289,7 +303,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(largeBinary.getObjectKey.nonEmpty)
assert(!largeBinary.getObjectKey.startsWith("/"))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
// ========================================
@@ -309,7 +323,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assertStandardBucket(largeBinary)
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream constructor should read large binary contents")
{
@@ -322,7 +336,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream and LargeBinaryInputStream should work
together end-to-end") {
@@ -344,7 +358,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
// ========================================
@@ -368,7 +382,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream should create large binary") {
@@ -381,7 +395,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assertStandardBucket(largeBinary)
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream should handle large data correctly") {
@@ -399,7 +413,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(largeData))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream should handle multiple writes") {
@@ -416,7 +430,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements("Hello World!".getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream should throw exception when writing to closed
stream") {
@@ -427,7 +441,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assertThrows[java.io.IOException](outStream.write("more".getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream should handle close() being called multiple
times") {
@@ -437,7 +451,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
outStream.close()
outStream.close() // Should not throw
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("New LargeBinary() constructor should create unique URIs") {
@@ -447,7 +461,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(largeBinary1.getUri != largeBinary2.getUri)
assert(largeBinary1.getObjectKey != largeBinary2.getObjectKey)
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinary() and LargeBinaryOutputStream API should be symmetric with
input") {
@@ -466,6 +480,47 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
+ }
+
+ test("deleteByExecution removes only the target execution's binaries") {
+ // Create one binary under execution 1001 and another under 1002.
+ setExecutionContext(1001L)
+ createLargeBinary("data for 1001")
+ setExecutionContext(1002L)
+ createLargeBinary("data for 1002")
+
+ // Delete only execution 1001's binaries.
+ LargeBinaryManager.deleteByExecution(1001L)
+
+ try {
+ assert(!S3StorageClient.directoryExists("texera-large-binaries",
"objects/1001"))
+ assert(S3StorageClient.directoryExists("texera-large-binaries",
"objects/1002"))
+ } finally {
+ // Keep the test self-contained: clean up 1002's objects and reset the
thread's
+ // base URI rather than relying on the next test's beforeEach.
+ LargeBinaryManager.deleteByExecution(1002L)
+ setExecutionContext(TestExecutionId)
+ }
+ }
+
+ test("deleteByExecution distinguishes executions whose ids share a numeric
prefix") {
+ // "objects/1" is a string prefix of "objects/11"; a prefix delete that
dropped the
+ // trailing "/" would wipe both. Pin that deleting execution 1 leaves 11
untouched.
+ setExecutionContext(1L)
+ createLargeBinary("data for execution 1")
+ setExecutionContext(11L)
+ createLargeBinary("data for execution 11")
+
+ LargeBinaryManager.deleteByExecution(1L)
+
+ try {
+ assert(!S3StorageClient.directoryExists("texera-large-binaries",
"objects/1"))
+ assert(S3StorageClient.directoryExists("texera-large-binaries",
"objects/11"))
+ } finally {
+ // Self-contained cleanup, mirroring the isolation test above.
+ LargeBinaryManager.deleteByExecution(11L)
+ setExecutionContext(TestExecutionId)
+ }
}
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala
new file mode 100644
index 0000000000..d6268d29d3
--- /dev/null
+++
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import org.scalatest.funsuite.AnyFunSuite
+
+/**
+ * Unit tests for [[LargeBinaryManager.deleteByExecution]] with the
directory-delete op
+ * injected, so success and the swallow-and-log error path run without a live
S3 endpoint.
+ */
+class LargeBinaryManagerUnitSpec extends AnyFunSuite {
+
+ test("deleteByExecution issues a delete scoped to the execution's object
prefix") {
+ var captured: Option[(String, String)] = None
+ LargeBinaryManager.deleteByExecution(
+ 42L,
+ (bucket, prefix) => captured = Some((bucket, prefix))
+ )
+ assert(captured.contains((LargeBinaryManager.DEFAULT_BUCKET,
"objects/42")))
+ }
+
+ test("deleteByExecution swallows exceptions raised by the underlying
delete") {
+ // The error path logs and returns; it must not propagate the failure to
callers.
+ LargeBinaryManager.deleteByExecution(7L, (_, _) => throw new
RuntimeException("boom"))
+ succeed
+ }
+
+ test("create returns a URI under the current thread's base URI") {
+ // create() reads a thread-local; run on a dedicated thread so the base
URI is
+ // isolated and does not leak into other tests.
+ @volatile var uri: String = ""
+ val thread = new Thread(() => {
+
LargeBinaryManager.setCurrentBaseUri(LargeBinaryManager.baseUriForExecution(555L))
+ uri = LargeBinaryManager.create()
+ })
+ thread.start()
+ thread.join()
+ val prefix = s"s3://${LargeBinaryManager.DEFAULT_BUCKET}/objects/555/"
+ assert(uri.startsWith(prefix))
+ // a unique (UUID) suffix follows the execution-scoped prefix
+ assert(uri.stripPrefix(prefix).nonEmpty)
+ }
+
+ test("create throws when no base URI is set on the thread") {
+ // A fresh thread starts with no base URI, so create() must fail fast.
+ @volatile var caught: Option[Throwable] = None
+ val thread = new Thread(() => {
+ try LargeBinaryManager.create()
+ catch { case e: Throwable => caught = Some(e) }
+ })
+ thread.start()
+ thread.join()
+ assert(caught.exists(_.isInstanceOf[IllegalStateException]))
+ }
+}