kevinjqliu commented on code in PR #701:
URL: https://github.com/apache/iceberg-python/pull/701#discussion_r1590382950


##########
pyiceberg/catalog/hive.py:
##########
@@ -111,6 +122,13 @@
 HIVE2_COMPATIBLE = "hive.hive2-compatible"
 HIVE2_COMPATIBLE_DEFAULT = False
 
+DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 2
+DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 30
+DEFAULT_LOCK_CHECK_RETRIES = 5
+DEFAULT_LOCK_CHECK_MULTIPLIER = 2

Review Comment:
   wdyt about grouping these configs into TableProperties, along with their 
default value
   
   
https://github.com/apache/iceberg-python/blob/7bd5d9e6c32bcc5b46993d6bfaeed50471e972ae/pyiceberg/table/__init__.py#L200
   



##########
tests/integration/test_reads.py:
##########
@@ -506,3 +508,40 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) 
-> None:
                 
table.transaction().set_properties(lock="fail").commit_transaction()
         finally:
             open_client.unlock(UnlockRequest(lock.lockid))
+
+
+@pytest.mark.integration
+def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None:
+    table = create_table(session_catalog_hive)
+    database_name: str
+    table_name: str
+    _, database_name, table_name = table.identifier
+
+    hive_client: _HiveClient = 
_HiveClient(session_catalog_hive.properties["uri"])
+
+    executor = ExecutorFactory.get_or_create()
+
+    with hive_client as open_client:
+
+        def another_task() -> None:
+            lock1: LockResponse = 
open_client.lock(session_catalog_hive._create_lock_request(database_name, 
table_name))
+            time.sleep(5)

Review Comment:
   It might be easier to mock `lock` and `check_lock` functions instead of 
relying on the timing of the function calls



##########
tests/integration/test_reads.py:
##########
@@ -506,3 +508,40 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) 
-> None:
                 
table.transaction().set_properties(lock="fail").commit_transaction()
         finally:
             open_client.unlock(UnlockRequest(lock.lockid))
+
+
+@pytest.mark.integration
+def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None:
+    table = create_table(session_catalog_hive)
+    database_name: str
+    table_name: str
+    _, database_name, table_name = table.identifier
+
+    hive_client: _HiveClient = 
_HiveClient(session_catalog_hive.properties["uri"])
+
+    executor = ExecutorFactory.get_or_create()
+
+    with hive_client as open_client:
+
+        def another_task() -> None:
+            lock1: LockResponse = 
open_client.lock(session_catalog_hive._create_lock_request(database_name, 
table_name))
+            time.sleep(5)

Review Comment:
   
https://stackoverflow.com/questions/47906671/python-retry-with-tenacity-disable-wait-for-unittest
   
   This might be helpful to override the waiting behavior in retry



##########
tests/integration/test_reads.py:
##########
@@ -506,3 +508,40 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) 
-> None:
                 
table.transaction().set_properties(lock="fail").commit_transaction()
         finally:
             open_client.unlock(UnlockRequest(lock.lockid))
+
+
+@pytest.mark.integration
+def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None:
+    table = create_table(session_catalog_hive)
+    database_name: str
+    table_name: str
+    _, database_name, table_name = table.identifier
+
+    hive_client: _HiveClient = 
_HiveClient(session_catalog_hive.properties["uri"])
+
+    executor = ExecutorFactory.get_or_create()
+
+    with hive_client as open_client:
+
+        def another_task() -> None:
+            lock1: LockResponse = 
open_client.lock(session_catalog_hive._create_lock_request(database_name, 
table_name))
+            time.sleep(5)

Review Comment:
   Also maybe add a test case for when `_wait_for_lock` failed to acquire locks 
after retry



##########
tests/integration/test_reads.py:
##########
@@ -506,3 +508,40 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) 
-> None:
                 
table.transaction().set_properties(lock="fail").commit_transaction()
         finally:
             open_client.unlock(UnlockRequest(lock.lockid))
+
+
+@pytest.mark.integration
+def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None:
+    table = create_table(session_catalog_hive)
+    database_name: str
+    table_name: str
+    _, database_name, table_name = table.identifier
+
+    hive_client: _HiveClient = 
_HiveClient(session_catalog_hive.properties["uri"])
+
+    executor = ExecutorFactory.get_or_create()
+
+    with hive_client as open_client:
+
+        def another_task() -> None:
+            lock1: LockResponse = 
open_client.lock(session_catalog_hive._create_lock_request(database_name, 
table_name))
+            time.sleep(5)

Review Comment:
   nit: `time.sleep` in test is typically an anti-pattern, this will add at 
least 5 seconds to the test suite in the future. 
   
   



##########
pyiceberg/catalog/hive.py:
##########
@@ -356,6 +378,28 @@ def _create_lock_request(self, database_name: str, 
table_name: str) -> LockReque
 
         return lock_request
 
+    def _wait_for_lock(self, database_name: str, table_name: str, lockid: int, 
open_client: Client) -> LockResponse:
+        @retry(
+            retry=retry_if_exception_type(WaitingForLockException),
+            wait=wait_exponential(
+                multiplier=self._lock_check_multiplier, 
min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time
+            ),
+            stop=stop_after_attempt(self._lock_check_retries),
+            reraise=True,
+        )
+        def _do_wait_for_lock() -> LockResponse:
+            response: LockResponse = 
open_client.check_lock(CheckLockRequest(lockid=lockid))
+            if response.state == LockState.ACQUIRED:
+                return response
+            elif response.state == LockState.WAITING:
+                msg = f"Wait on lock for {database_name}:{table_name}"

Review Comment:
   nit: database_name.table_name 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to