Yicong-Huang commented on code in PR #5280:
URL: https://github.com/apache/texera/pull/5280#discussion_r3337858354


##########
amber/src/main/python/pytexera/storage/large_binary_manager.py:
##########
@@ -22,57 +22,92 @@
 and LargeBinaryInputStream/LargeBinaryOutputStream instead.
 """
 
-import time
 import uuid
 from loguru import logger
 from core.storage.storage_config import StorageConfig
 
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
 
+class LargeBinaryManager:
+    """Manages execution-scoped large binaries in S3 for a worker process.
 
-def _get_s3_client():
-    """Get or initialize S3 client (lazy initialization, cached)."""
-    global _s3_client
-    if _s3_client is None:
-        try:
-            import boto3
-            from botocore.config import Config
-        except ImportError as e:
-            raise RuntimeError("boto3 required. Install with: pip install 
boto3") from e
-
-        _s3_client = boto3.client(
-            "s3",
-            endpoint_url=StorageConfig.S3_ENDPOINT,
-            aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
-            aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
-            region_name=StorageConfig.S3_REGION,
-            config=Config(signature_version="s3v4", s3={"addressing_style": 
"path"}),
-        )
-    return _s3_client
-
-
-def _ensure_bucket_exists(bucket: str):
-    """Ensure S3 bucket exists, creating it if necessary."""
-    s3 = _get_s3_client()
-    try:
-        s3.head_bucket(Bucket=bucket)
-    except s3.exceptions.NoSuchBucket:
-        logger.debug(f"Bucket {bucket} not found, creating it")
-        s3.create_bucket(Bucket=bucket)
-        logger.info(f"Created bucket: {bucket}")
-
-
-def create() -> str:
+    A Python worker is a single process serving one execution, so a single 
shared
+    instance (the module-level ``large_binary_manager``) holds the cached S3 
client
+    and the current execution id. Mirrors the JVM ``LargeBinaryManager``.
     """
-    Creates a new largebinary reference with a unique S3 URI.
 
-    Returns:
-        S3 URI string (format: s3://bucket/key)
-    """
-    _ensure_bucket_exists(DEFAULT_BUCKET)
-    timestamp_ms = int(time.time() * 1000)
-    unique_id = uuid.uuid4()
-    object_key = f"objects/{timestamp_ms}/{unique_id}"
-    return f"s3://{DEFAULT_BUCKET}/{object_key}"
+    DEFAULT_BUCKET = "texera-large-binaries"
+
+    def __init__(self):
+        self._s3_client = None
+        # Execution context: set at executor init and read by create() so the
+        # user-facing largebinary() API stays execution-id-free.
+        self._current_execution_id = None
+
+    def set_current_execution_id(self, execution_id):
+        """Sets the execution id used to scope large binaries created by this 
worker."""
+        self._current_execution_id = execution_id
+
+    def get_current_execution_id(self):
+        """Returns the execution id set for this worker, or None if unset."""
+        return self._current_execution_id
+
+    def _get_s3_client(self):
+        """Get or initialize the S3 client (lazy initialization, cached)."""
+        if self._s3_client is None:
+            try:
+                import boto3
+                from botocore.config import Config
+            except ImportError as e:
+                raise RuntimeError(
+                    "boto3 required. Install with: pip install boto3"
+                ) from e
+
+            self._s3_client = boto3.client(
+                "s3",
+                endpoint_url=StorageConfig.S3_ENDPOINT,
+                aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
+                aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
+                region_name=StorageConfig.S3_REGION,
+                config=Config(
+                    signature_version="s3v4", s3={"addressing_style": "path"}
+                ),
+            )
+        return self._s3_client
+
+    def _ensure_bucket_exists(self, bucket: str):
+        """Ensure the S3 bucket exists, creating it if necessary."""
+        s3 = self._get_s3_client()
+        try:
+            s3.head_bucket(Bucket=bucket)
+        except s3.exceptions.NoSuchBucket:
+            logger.debug(f"Bucket {bucket} not found, creating it")
+            s3.create_bucket(Bucket=bucket)
+            logger.info(f"Created bucket: {bucket}")
+
+    def create(self) -> str:
+        """
+        Creates a new largebinary reference with a unique, execution-scoped S3 
URI.
+
+        The object key is namespaced by the current execution id so cleanup 
can delete
+        only this execution's objects. The execution id is injected by the 
system (set
+        via set_current_execution_id() when the worker is initialized); 
callers never
+        pass it.
+
+        Returns:
+            S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid})
+        """
+        self._ensure_bucket_exists(self.DEFAULT_BUCKET)
+        execution_id = self.get_current_execution_id()
+        if execution_id is None:
+            raise RuntimeError(
+                "largebinary() requires an execution context, but no execution 
id "
+                "has been set for this worker."
+            )
+        unique_id = uuid.uuid4()
+        object_key = f"objects/{execution_id}/{unique_id}"
+        return f"s3://{self.DEFAULT_BUCKET}/{object_key}"
+
+
+# Shared singleton for the worker process. Consumers import this instance:
+#   from pytexera.storage.large_binary_manager import large_binary_manager
+large_binary_manager = LargeBinaryManager()

Review Comment:
   ok I read more on this, 
   
   https://www.thepythoncodingstack.com/p/creating-a-singleton-class-in-python
   
   we can use two ways to create singleton:
   1. a full singleton public class with `__new__` to guard creation of such a 
class. Callsites can import this class and it will be used as a singleton.
   2. use a module level variable to create a single instance of a class, and 
let callsite to import that instance only. (The way your currently 
implementation is doing) This is OK for internal APIs and sometime preferable 
in python's context. But we need to make sure callsite do not import the class. 
In another word, we need to make `class LargetBinaryManager` private: `class 
_LargeBinaryManager` and only expose the instance.
   
   I myself would prefer the first one. but as this is used internally, either 
way is fine. you can decide which way to go. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to