QlikFrederic commented on issue #2409:
URL: 
https://github.com/apache/iceberg-python/issues/2409#issuecomment-3245873409

   Hi, adding `table = table.refresh()` doesn't work with the in-memory catalog 
(then it seems to start searching for a sql catalog instead). 
   So I've adapted the script to use a sql catalog (and load the table from the 
catalog in the start of the `expire_oldest_snapshots` function), 
   - running 1 thread for snapshot expiration (commenting out the second 
thread): runs fine.
   - running 2 threads: from table 1, snapshots expire successfully, table2 
fails with snapshot ids not found (since it is trying to delete snapshots of 
table 1)
   - adding `table.refresh` to the `expire_oldest_snapshots` function, gives me 
the same result (although this time it is table 2 that is successful and table 
1 that fails deleting a snapshot of table 2)
   
   ```
   from pyiceberg.catalog.sql import SqlCatalog
   from datetime import datetime, timezone
   import polars as pl
   import os
   import threading
   import traceback
   import time
   import random
   import shutil
   
   def cleanup_workspace():
       """Clean up the iceberg workspace before starting"""
       try:
           shutil.rmtree("/tmp/iceberg", ignore_errors=True)
           # Recreate the directory for SQLite catalog
           os.makedirs("/tmp/iceberg", exist_ok=True)
           print("Cleaned up /tmp/iceberg workspace")
       except Exception as e:
           print(f"Warning: Could not clean workspace: {e}")
   
   def generate_df(batch_id=0):
       df = pl.DataFrame(
           {
               "event_type": ["playback"] * 1000,
               "event_origin": [f"origin{random.randint(1, 5)}"] * 1000,
               "event_send_at": [datetime.now(timezone.utc)] * 1000,
               "event_saved_at": [datetime.now(timezone.utc)] * 1000,
               "data": [
                   {
                       "calendarKey": f"calendarKey-{batch_id}",
                       "id": str(i + batch_id * 1000),
                       "referenceId": f"ref-{batch_id}-{i}",
                   }
                   for i in range(1000)
               ],
           }
       )
       return df
   
   # Clean up workspace before starting
   cleanup_workspace()
   
   df = generate_df()
   catalog = SqlCatalog(
       "default",
       warehouse="/tmp/iceberg",
       uri="sqlite:////tmp/iceberg/catalog.db"
   )
   catalog.create_namespace("default")
   table1 = catalog.create_table(
       "default.table1", schema=df.to_arrow().schema, 
location="/tmp/iceberg/table1"
   )
   
   table2 = catalog.create_table(
       "default.table2", schema=df.to_arrow().schema, 
location="/tmp/iceberg/table2"
   )
   
   # Function to add multiple commits to a table
   def add_commits_to_table(table, table_name, num_commits=5):
       print(f"Adding {num_commits} commits to {table_name}")
       for i in range(num_commits):
           df_batch = generate_df(batch_id=i)
           table.append(df_batch.to_arrow())
           print(f"  Added commit {i+1} to {table_name}")
           time.sleep(0.1)  # Small delay between commits
   
   # Add multiple commits to both tables
   print("Creating multiple snapshots...")
   add_commits_to_table(table1, "table1")
   add_commits_to_table(table2, "table2")
   
   print("Tables created, starting expire operations...")
   
   # Function to expire oldest 3 snapshots in a thread
   def expire_oldest_snapshots(table_name):
       try:
           print(f"{table_name}: Starting expire operation...")
           
           # Reload table from catalog in this thread
           table = catalog.load_table(f"default.{table_name}")
           
           # Get all snapshots - get fresh snapshot list
           snapshots = list(table.snapshots())
           print(f"{table_name}: Current snapshots: {[s.snapshot_id for s in 
snapshots]}")
           
           if len(snapshots) <= 3:
               print(f"{table_name}: Not enough snapshots to expire 3 (only 
{len(snapshots)})")
               return
           
           # Find the oldest 3 snapshots
           oldest_snapshots = snapshots[:3]
           oldest_ids = [snapshot.snapshot_id for snapshot in oldest_snapshots]
           
           print(f"{table_name}: Found {len(snapshots)} snapshots, attempting 
to expire oldest 3: {oldest_ids}")
           
           # Verify the snapshots still exist before expiring
           current_ids = [s.snapshot_id for s in snapshots]
           for oid in oldest_ids:
               if oid not in current_ids:
                   print(f"{table_name}: Snapshot {oid} no longer exists, 
skipping")
                   return
   
           # Expire the oldest 3 snapshots by IDs
           table = table.refresh()
           with table.maintenance.expire_snapshots() as expire:
               for id in oldest_ids:
                   expire.by_id(id)
           
           print(f"{table_name}: Successfully expired snapshots {oldest_ids}")
           
       except Exception as e:
           print(f"{table_name}: Error expiring snapshots: {e}")
           traceback.print_exc()
   
   # Run expire_snapshots in parallel threads
   print("\nRunning expire_snapshots in parallel threads...")
   thread1 = threading.Thread(target=expire_oldest_snapshots, args=("table1",))
   thread2 = threading.Thread(target=expire_oldest_snapshots, args=("table2",))
   
   thread1.start()
   thread2.start()
   
   thread1.join()
   thread2.join()
   
   print("\nDone!")
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to