QlikFrederic commented on issue #2409:
URL:
https://github.com/apache/iceberg-python/issues/2409#issuecomment-3245873409
Hi, adding `table = table.refresh()` doesn't work with the in-memory catalog
(then it seems to start searching for a sql catalog instead).
So I've adapted the script to use a sql catalog (and load the table from the
catalog in the start of the `expire_oldest_snapshots` function),
- running 1 thread for snapshot expiration (commenting out the second
thread): runs fine.
- running 2 threads: from table 1, snapshots expire successfully, table2
fails with snapshot ids not found (since it is trying to delete snapshots of
table 1)
- adding `table.refresh` to the `expire_oldest_snapshots` function, gives me
the same result (although this time it is table 2 that is successful and table
1 that fails deleting a snapshot of table 2)
```
from pyiceberg.catalog.sql import SqlCatalog
from datetime import datetime, timezone
import polars as pl
import os
import threading
import traceback
import time
import random
import shutil
def cleanup_workspace():
"""Clean up the iceberg workspace before starting"""
try:
shutil.rmtree("/tmp/iceberg", ignore_errors=True)
# Recreate the directory for SQLite catalog
os.makedirs("/tmp/iceberg", exist_ok=True)
print("Cleaned up /tmp/iceberg workspace")
except Exception as e:
print(f"Warning: Could not clean workspace: {e}")
def generate_df(batch_id=0):
df = pl.DataFrame(
{
"event_type": ["playback"] * 1000,
"event_origin": [f"origin{random.randint(1, 5)}"] * 1000,
"event_send_at": [datetime.now(timezone.utc)] * 1000,
"event_saved_at": [datetime.now(timezone.utc)] * 1000,
"data": [
{
"calendarKey": f"calendarKey-{batch_id}",
"id": str(i + batch_id * 1000),
"referenceId": f"ref-{batch_id}-{i}",
}
for i in range(1000)
],
}
)
return df
# Clean up workspace before starting
cleanup_workspace()
df = generate_df()
catalog = SqlCatalog(
"default",
warehouse="/tmp/iceberg",
uri="sqlite:////tmp/iceberg/catalog.db"
)
catalog.create_namespace("default")
table1 = catalog.create_table(
"default.table1", schema=df.to_arrow().schema,
location="/tmp/iceberg/table1"
)
table2 = catalog.create_table(
"default.table2", schema=df.to_arrow().schema,
location="/tmp/iceberg/table2"
)
# Function to add multiple commits to a table
def add_commits_to_table(table, table_name, num_commits=5):
print(f"Adding {num_commits} commits to {table_name}")
for i in range(num_commits):
df_batch = generate_df(batch_id=i)
table.append(df_batch.to_arrow())
print(f" Added commit {i+1} to {table_name}")
time.sleep(0.1) # Small delay between commits
# Add multiple commits to both tables
print("Creating multiple snapshots...")
add_commits_to_table(table1, "table1")
add_commits_to_table(table2, "table2")
print("Tables created, starting expire operations...")
# Function to expire oldest 3 snapshots in a thread
def expire_oldest_snapshots(table_name):
try:
print(f"{table_name}: Starting expire operation...")
# Reload table from catalog in this thread
table = catalog.load_table(f"default.{table_name}")
# Get all snapshots - get fresh snapshot list
snapshots = list(table.snapshots())
print(f"{table_name}: Current snapshots: {[s.snapshot_id for s in
snapshots]}")
if len(snapshots) <= 3:
print(f"{table_name}: Not enough snapshots to expire 3 (only
{len(snapshots)})")
return
# Find the oldest 3 snapshots
oldest_snapshots = snapshots[:3]
oldest_ids = [snapshot.snapshot_id for snapshot in oldest_snapshots]
print(f"{table_name}: Found {len(snapshots)} snapshots, attempting
to expire oldest 3: {oldest_ids}")
# Verify the snapshots still exist before expiring
current_ids = [s.snapshot_id for s in snapshots]
for oid in oldest_ids:
if oid not in current_ids:
print(f"{table_name}: Snapshot {oid} no longer exists,
skipping")
return
# Expire the oldest 3 snapshots by IDs
table = table.refresh()
with table.maintenance.expire_snapshots() as expire:
for id in oldest_ids:
expire.by_id(id)
print(f"{table_name}: Successfully expired snapshots {oldest_ids}")
except Exception as e:
print(f"{table_name}: Error expiring snapshots: {e}")
traceback.print_exc()
# Run expire_snapshots in parallel threads
print("\nRunning expire_snapshots in parallel threads...")
thread1 = threading.Thread(target=expire_oldest_snapshots, args=("table1",))
thread2 = threading.Thread(target=expire_oldest_snapshots, args=("table2",))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("\nDone!")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]