kevinjqliu commented on code in PR #701: URL: https://github.com/apache/iceberg-python/pull/701#discussion_r1590382950
########## pyiceberg/catalog/hive.py: ########## @@ -111,6 +122,13 @@ HIVE2_COMPATIBLE = "hive.hive2-compatible" HIVE2_COMPATIBLE_DEFAULT = False +DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 2 +DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 30 +DEFAULT_LOCK_CHECK_RETRIES = 5 +DEFAULT_LOCK_CHECK_MULTIPLIER = 2 Review Comment: wdyt about grouping these configs into TableProperties, along with their default value https://github.com/apache/iceberg-python/blob/7bd5d9e6c32bcc5b46993d6bfaeed50471e972ae/pyiceberg/table/__init__.py#L200 ########## tests/integration/test_reads.py: ########## @@ -506,3 +508,40 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) -> None: table.transaction().set_properties(lock="fail").commit_transaction() finally: open_client.unlock(UnlockRequest(lock.lockid)) + + +@pytest.mark.integration +def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None: + table = create_table(session_catalog_hive) + database_name: str + table_name: str + _, database_name, table_name = table.identifier + + hive_client: _HiveClient = _HiveClient(session_catalog_hive.properties["uri"]) + + executor = ExecutorFactory.get_or_create() + + with hive_client as open_client: + + def another_task() -> None: + lock1: LockResponse = open_client.lock(session_catalog_hive._create_lock_request(database_name, table_name)) + time.sleep(5) Review Comment: It might be easier to mock `lock` and `check_lock` functions instead of relying on the timing of the function calls ########## tests/integration/test_reads.py: ########## @@ -506,3 +508,40 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) -> None: table.transaction().set_properties(lock="fail").commit_transaction() finally: open_client.unlock(UnlockRequest(lock.lockid)) + + +@pytest.mark.integration +def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None: + table = create_table(session_catalog_hive) + database_name: str + table_name: str + _, database_name, table_name = table.identifier + + hive_client: _HiveClient = _HiveClient(session_catalog_hive.properties["uri"]) + + executor = ExecutorFactory.get_or_create() + + with hive_client as open_client: + + def another_task() -> None: + lock1: LockResponse = open_client.lock(session_catalog_hive._create_lock_request(database_name, table_name)) + time.sleep(5) Review Comment: https://stackoverflow.com/questions/47906671/python-retry-with-tenacity-disable-wait-for-unittest This might be helpful to override the waiting behavior in retry ########## tests/integration/test_reads.py: ########## @@ -506,3 +508,40 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) -> None: table.transaction().set_properties(lock="fail").commit_transaction() finally: open_client.unlock(UnlockRequest(lock.lockid)) + + +@pytest.mark.integration +def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None: + table = create_table(session_catalog_hive) + database_name: str + table_name: str + _, database_name, table_name = table.identifier + + hive_client: _HiveClient = _HiveClient(session_catalog_hive.properties["uri"]) + + executor = ExecutorFactory.get_or_create() + + with hive_client as open_client: + + def another_task() -> None: + lock1: LockResponse = open_client.lock(session_catalog_hive._create_lock_request(database_name, table_name)) + time.sleep(5) Review Comment: Also maybe add a test case for when `_wait_for_lock` failed to acquire locks after retry ########## tests/integration/test_reads.py: ########## @@ -506,3 +508,40 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) -> None: table.transaction().set_properties(lock="fail").commit_transaction() finally: open_client.unlock(UnlockRequest(lock.lockid)) + + +@pytest.mark.integration +def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None: + table = create_table(session_catalog_hive) + database_name: str + table_name: str + _, database_name, table_name = table.identifier + + hive_client: _HiveClient = _HiveClient(session_catalog_hive.properties["uri"]) + + executor = ExecutorFactory.get_or_create() + + with hive_client as open_client: + + def another_task() -> None: + lock1: LockResponse = open_client.lock(session_catalog_hive._create_lock_request(database_name, table_name)) + time.sleep(5) Review Comment: nit: `time.sleep` in test is typically an anti-pattern, this will add at least 5 seconds to the test suite in the future. ########## pyiceberg/catalog/hive.py: ########## @@ -356,6 +378,28 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque return lock_request + def _wait_for_lock(self, database_name: str, table_name: str, lockid: int, open_client: Client) -> LockResponse: + @retry( + retry=retry_if_exception_type(WaitingForLockException), + wait=wait_exponential( + multiplier=self._lock_check_multiplier, min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time + ), + stop=stop_after_attempt(self._lock_check_retries), + reraise=True, + ) + def _do_wait_for_lock() -> LockResponse: + response: LockResponse = open_client.check_lock(CheckLockRequest(lockid=lockid)) + if response.state == LockState.ACQUIRED: + return response + elif response.state == LockState.WAITING: + msg = f"Wait on lock for {database_name}:{table_name}" Review Comment: nit: database_name.table_name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org