Xiao-zhen-Liu commented on code in PR #5280:
URL: https://github.com/apache/texera/pull/5280#discussion_r3358425311
##########
amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala:
##########
@@ -611,7 +611,8 @@ class WorkflowResource extends LazyLogging {
.asScala
.toList
- LargeBinaryManager.deleteAllObjects()
+ // Delete large binaries for each execution belonging to the workflows
being removed
+ eids.foreach(eid =>
LargeBinaryManager.deleteByExecution(eid.longValue()))
Review Comment:
The large binaries are deleted here, but the database deletion happens in
the transaction at line 632. If that transaction rolls back (e.g. one of the
workflow ids doesn't belong to the user), the workflow rows survive but their
binaries are already gone. The document cleanup just below already runs *after*
the transaction for this exact reason — I'd move this `deleteByExecution` call
down next to it.
##########
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto:
##########
@@ -252,6 +252,7 @@ message InitializeExecutorRequest {
int32 totalWorkerCount = 1;
core.OpExecInitInfo opExecInitInfo = 2;
bool isSource = 3;
+ core.ExecutionIdentity executionId = 4;
Review Comment:
This field tells the worker which execution it's in, so `create()` can put
the execution id in the S3 key. It works, but it makes large binaries the odd
one out.
Every other execution-scoped resource (results, state, console messages) is
named by the controller — which already has the execution id — and the finished
location is handed to the worker as data, so the worker never needs the id.
Large binaries are the exception: the worker builds the location itself in
`create()`, which is the only reason we now push the execution id down to
workers (this field, the handler change, and the per-engine id state all exist
just for that).
The consistent fix is to hand the worker a base location the same way the
controller already hands down port locations, and let `create()` append a
unique id — then this field and the worker-side state go away. (Large binaries
being created on the fly only stops the controller from pre-creating the
objects, not from handing down the location.)
This is bigger than a bug fix, so it's your call whether to do it now or in
a follow-up — what's here works fine. Flagging it because this approach puts
state on the worker that the cleaner design would later remove.
##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala:
##########
@@ -33,31 +33,63 @@ object LargeBinaryManager extends LazyLogging {
val DEFAULT_BUCKET: String = "texera-large-binaries"
/**
- * Creates a new LargeBinary reference.
+ * Worker-scoped execution context. It is set on the data-processing thread
when an
+ * executor is initialized.
+ */
+ private val currentExecutionId: ThreadLocal[Option[Long]] =
Review Comment:
This whole mechanism depends on the execution id being set and read on the
same worker thread, but that requirement isn't written down anywhere. Please
state it plainly here — something like "must be set on, and read from, the
worker's data-processing thread." (Minor: the Python manager's docstring says
it "Mirrors the JVM object LargeBinaryManager"; the two actually use different
mechanisms — a thread-local here vs. a process-wide singleton there — so it's
worth noting they differ rather than match.)
##########
amber/src/test/python/pytexera/storage/test_large_binary_manager.py:
##########
@@ -152,3 +162,35 @@ def test_create_uses_default_bucket(self):
uri = large_binary_manager.create()
assert large_binary_manager.DEFAULT_BUCKET in uri
+ assert
f"objects/{large_binary_manager.get_current_execution_id()}/" in uri
+
+
+def test_create_stamps_execution_id(monkeypatch):
+ # Avoid touching real S3 while testing key generation.
+ monkeypatch.setattr(
+ large_binary_manager, "_ensure_bucket_exists", lambda bucket: None
+ )
+ monkeypatch.setattr(large_binary_manager, "_current_execution_id", 42)
Review Comment:
This sets the private `_current_execution_id` directly, while the class
fixture above uses the public setter. Using the setter here too would be more
consistent (and safer if the internals change).
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala:
##########
@@ -44,6 +45,7 @@ trait InitializeExecutorHandler {
)
)
cachedTotalWorkerCount = req.totalWorkerCount
+ req.executionId.foreach(eid =>
LargeBinaryManager.setCurrentExecutionId(eid.id))
Review Comment:
The two engines disagree when the id is missing: here, `foreach` does
nothing and the previous value stays; the Python handler stores `None`, which
makes the next `create()` fail loudly. Worth making them behave the same. The
cleanest fix is to make the proto field required (`no_box`), which removes the
"missing id" case entirely and lets you drop the `Some(...)` wrapper at
`RegionExecutionCoordinator.scala:408`. (Relevant only if the current approach
stays — see the design note on the proto field.)
##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala:
##########
@@ -33,31 +33,63 @@ object LargeBinaryManager extends LazyLogging {
val DEFAULT_BUCKET: String = "texera-large-binaries"
/**
- * Creates a new LargeBinary reference.
+ * Worker-scoped execution context. It is set on the data-processing thread
when an
+ * executor is initialized.
+ */
+ private val currentExecutionId: ThreadLocal[Option[Long]] =
+ ThreadLocal.withInitial(() => Option.empty[Long])
+
+ /** Sets the execution id for large binaries created on the current thread.
*/
+ def setCurrentExecutionId(executionId: Long): Unit =
+ currentExecutionId.set(Some(executionId))
+
+ /**
+ * Creates a new LargeBinary reference scoped to the current execution.
* The actual data upload happens separately via LargeBinaryOutputStream.
*
- * @return S3 URI string for the new LargeBinary (format: s3://bucket/key)
+ * @return S3 URI string for the new LargeBinary (format:
s3://bucket/objects/{eid}/{uuid})
*/
def create(): String = {
- val objectKey =
s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}"
- val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
-
- uri
+ val eid = currentExecutionId
+ .get()
+ .getOrElse(
+ throw new IllegalStateException(
+ "LargeBinaryManager.create() requires an execution context, " +
+ "but none was set on the current thread."
+ )
+ )
+ val objectKey = s"objects/$eid/${UUID.randomUUID()}"
+ s"s3://$DEFAULT_BUCKET/$objectKey"
}
/**
- * Deletes all large binaries from the bucket.
+ * Deletes all large binaries belonging to a single execution.
*
- * @throws java.lang.Exception if the deletion fails
- * @return Unit
+ * @param executionId the execution whose large binaries should be removed
+ */
+ def deleteByExecution(executionId: Long): Unit =
+ deleteByExecution(executionId, S3StorageClient.deleteDirectory)
+
+ /**
+ * Overload that takes the directory-delete operation as a parameter.
Visible for
+ * testing
*/
- def deleteAllObjects(): Unit = {
+ private[util] def deleteByExecution(
+ executionId: Long,
+ deleteDir: (String, String) => Unit
+ ): Unit = {
try {
- S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects")
- logger.info(s"Successfully deleted all large binaries from bucket:
$DEFAULT_BUCKET")
+ deleteDir(DEFAULT_BUCKET, s"objects/$executionId")
+ logger.info(
+ s"Deleted large binaries for execution $executionId from bucket:
$DEFAULT_BUCKET"
+ )
} catch {
case e: Exception =>
Review Comment:
This catches every exception and logs a single WARN, so a real failure (bad
credentials, unreachable endpoint, partial delete) silently leaves storage
behind. If swallowing is intentional — so a cleanup hiccup doesn't fail the
workflow delete — that's reasonable, but please log at `error` and/or say so in
a comment, otherwise leaked binaries are invisible.
##########
amber/src/main/python/pytexera/storage/large_binary_manager.py:
##########
@@ -22,57 +22,94 @@
and LargeBinaryInputStream/LargeBinaryOutputStream instead.
"""
-import time
import uuid
from loguru import logger
from core.storage.storage_config import StorageConfig
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
+class LargeBinaryManager:
+ """Manages execution-scoped large binaries in S3 for a worker process.
-def _get_s3_client():
- """Get or initialize S3 client (lazy initialization, cached)."""
- global _s3_client
- if _s3_client is None:
- try:
- import boto3
- from botocore.config import Config
- except ImportError as e:
- raise RuntimeError("boto3 required. Install with: pip install
boto3") from e
-
- _s3_client = boto3.client(
- "s3",
- endpoint_url=StorageConfig.S3_ENDPOINT,
- aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
- aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
- region_name=StorageConfig.S3_REGION,
- config=Config(signature_version="s3v4", s3={"addressing_style":
"path"}),
- )
- return _s3_client
-
-
-def _ensure_bucket_exists(bucket: str):
- """Ensure S3 bucket exists, creating it if necessary."""
- s3 = _get_s3_client()
- try:
- s3.head_bucket(Bucket=bucket)
- except s3.exceptions.NoSuchBucket:
- logger.debug(f"Bucket {bucket} not found, creating it")
- s3.create_bucket(Bucket=bucket)
- logger.info(f"Created bucket: {bucket}")
-
-
-def create() -> str:
+ Implemented as a singleton: ``LargeBinaryManager()`` always returns the
same
+ instance, so the cached S3 client and the current execution id are shared
across
+ all callers in the worker process. A Python worker is a single process
serving one
+ execution. Mirrors the JVM ``object LargeBinaryManager``.
"""
- Creates a new largebinary reference with a unique S3 URI.
- Returns:
- S3 URI string (format: s3://bucket/key)
- """
- _ensure_bucket_exists(DEFAULT_BUCKET)
- timestamp_ms = int(time.time() * 1000)
- unique_id = uuid.uuid4()
- object_key = f"objects/{timestamp_ms}/{unique_id}"
- return f"s3://{DEFAULT_BUCKET}/{object_key}"
+ DEFAULT_BUCKET = "texera-large-binaries"
Review Comment:
The bucket name "texera-large-binaries" is also written out in the Scala
manager (`LargeBinaryManager.scala:33`). If one changes and the other doesn't,
the two engines would quietly target different buckets — consider sourcing it
from `StorageConfig`. (Goes away with the design in the proto-field note.)
##########
amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala:
##########
@@ -200,7 +200,8 @@ class WorkerSpec
OpExecWithClassName(
"org.apache.texera.amber.engine.architecture.worker.DummyOperatorExecutor"
),
- isSource = false
+ isSource = false,
+ executionId = None
Review Comment:
`executionId = None` keeps this green but exercises a case production never
produces. Passing `Some(...)` would run the same path the real system does.
##########
amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py:
##########
@@ -26,7 +26,12 @@
class InitializeExecutorHandler(ControlHandler):
async def initialize_executor(self, req: InitializeExecutorRequest) ->
EmptyReturn:
+ from pytexera.storage.large_binary_manager import LargeBinaryManager
Review Comment:
Two small readability notes: the import is inside the function rather than
at the top — if that's to avoid a circular import, a one-line note would help;
otherwise move it up. And `req.execution_id.id if req.execution_id else None`
(next line) could use a short comment that an unset field arrives as `None`, so
the missing case fails loudly in `create()`.
##########
amber/src/main/python/pytexera/storage/large_binary_manager.py:
##########
@@ -22,57 +22,94 @@
and LargeBinaryInputStream/LargeBinaryOutputStream instead.
"""
-import time
import uuid
from loguru import logger
from core.storage.storage_config import StorageConfig
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
+class LargeBinaryManager:
+ """Manages execution-scoped large binaries in S3 for a worker process.
-def _get_s3_client():
- """Get or initialize S3 client (lazy initialization, cached)."""
- global _s3_client
- if _s3_client is None:
- try:
- import boto3
- from botocore.config import Config
- except ImportError as e:
- raise RuntimeError("boto3 required. Install with: pip install
boto3") from e
-
- _s3_client = boto3.client(
- "s3",
- endpoint_url=StorageConfig.S3_ENDPOINT,
- aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
- aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
- region_name=StorageConfig.S3_REGION,
- config=Config(signature_version="s3v4", s3={"addressing_style":
"path"}),
- )
- return _s3_client
-
-
-def _ensure_bucket_exists(bucket: str):
- """Ensure S3 bucket exists, creating it if necessary."""
- s3 = _get_s3_client()
- try:
- s3.head_bucket(Bucket=bucket)
- except s3.exceptions.NoSuchBucket:
- logger.debug(f"Bucket {bucket} not found, creating it")
- s3.create_bucket(Bucket=bucket)
- logger.info(f"Created bucket: {bucket}")
-
-
-def create() -> str:
+ Implemented as a singleton: ``LargeBinaryManager()`` always returns the
same
+ instance, so the cached S3 client and the current execution id are shared
across
+ all callers in the worker process. A Python worker is a single process
serving one
+ execution. Mirrors the JVM ``object LargeBinaryManager``.
"""
- Creates a new largebinary reference with a unique S3 URI.
- Returns:
- S3 URI string (format: s3://bucket/key)
- """
- _ensure_bucket_exists(DEFAULT_BUCKET)
- timestamp_ms = int(time.time() * 1000)
- unique_id = uuid.uuid4()
- object_key = f"objects/{timestamp_ms}/{unique_id}"
- return f"s3://{DEFAULT_BUCKET}/{object_key}"
+ DEFAULT_BUCKET = "texera-large-binaries"
+
+ _instance = None
+
+ def __new__(cls):
+ if cls._instance is None:
+ instance = super().__new__(cls)
+ instance._s3_client = None
+ # Execution context: set at executor init and read by create() so
the
+ # user-facing largebinary() API stays execution-id-free.
+ instance._current_execution_id = None
+ cls._instance = instance
+ return cls._instance
+
+ def set_current_execution_id(self, execution_id):
+ """Sets the execution id used to scope large binaries created by this
worker."""
+ self._current_execution_id = execution_id
+
+ def get_current_execution_id(self):
+ """Returns the execution id set for this worker, or None if unset."""
+ return self._current_execution_id
+
+ def _get_s3_client(self):
+ """Get or initialize the S3 client (lazy initialization, cached)."""
+ if self._s3_client is None:
+ try:
+ import boto3
+ from botocore.config import Config
+ except ImportError as e:
+ raise RuntimeError(
+ "boto3 required. Install with: pip install boto3"
+ ) from e
+
+ self._s3_client = boto3.client(
+ "s3",
+ endpoint_url=StorageConfig.S3_ENDPOINT,
+ aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
+ aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
+ region_name=StorageConfig.S3_REGION,
+ config=Config(
+ signature_version="s3v4", s3={"addressing_style": "path"}
+ ),
+ )
+ return self._s3_client
+
+ def _ensure_bucket_exists(self, bucket: str):
+ """Ensure the S3 bucket exists, creating it if necessary."""
+ s3 = self._get_s3_client()
+ try:
+ s3.head_bucket(Bucket=bucket)
+ except s3.exceptions.NoSuchBucket:
+ logger.debug(f"Bucket {bucket} not found, creating it")
+ s3.create_bucket(Bucket=bucket)
+ logger.info(f"Created bucket: {bucket}")
+
+ def create(self) -> str:
+ """
+ Creates a new largebinary reference with a unique, execution-scoped S3
URI.
+
+ The object key is namespaced by the current execution id so cleanup
can delete
+ only this execution's objects. The execution id is injected by the
system (set
+ via set_current_execution_id() when the worker is initialized);
callers never
+ pass it.
+
+ Returns:
+ S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid})
+ """
+ self._ensure_bucket_exists(self.DEFAULT_BUCKET)
Review Comment:
Small ordering thing: this calls `_ensure_bucket_exists` (an S3 round-trip)
before checking whether the execution id is set, so a worker with no id pays
for an S3 call before failing. Check the id first.
##########
common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala:
##########
@@ -466,6 +476,24 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
+ }
+
+ test("deleteByExecution removes only the target execution's binaries") {
Review Comment:
This is the test that actually proves the fix — good to have it. One small
thing: it leaves the worker's execution id set to 1002 when it exits and relies
on the next test's `beforeEach` to reset. Resetting in the `finally` would keep
it self-contained if someone adds a test below it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]