QlikFrederic opened a new issue, #2409:
URL: https://github.com/apache/iceberg-python/issues/2409
### Apache Iceberg version
main (development)
### Please describe the bug 🐞
When using two iceberg tables and running the maintenance task in threads,
the commit step will try to commit the snapshot expiration to the wrong table
resulting in an error (snapshot does not exist).
Script to reproduce the issue:
```
from pyiceberg.catalog.memory import InMemoryCatalog
from datetime import datetime, timezone
import polars as pl
import threading
import time
import random
def generate_df(batch_id=0):
df = pl.DataFrame(
{
"event_type": ["playback"] * 1000,
"event_origin": [f"origin{random.randint(1, 5)}"] * 1000,
"event_send_at": [datetime.now(timezone.utc)] * 1000,
"event_saved_at": [datetime.now(timezone.utc)] * 1000,
"data": [
{
"calendarKey": f"calendarKey-{batch_id}",
"id": str(i + batch_id * 1000),
"referenceId": f"ref-{batch_id}-{i}",
}
for i in range(1000)
],
}
)
return df
df = generate_df()
catalog = InMemoryCatalog("default", warehouse="/tmp/iceberg")
catalog.create_namespace_if_not_exists("default")
table1 = catalog.create_table_if_not_exists(
"default.table1", schema=df.to_arrow().schema,
location="/tmp/iceberg/table1"
)
table2 = catalog.create_table_if_not_exists(
"default.table2", schema=df.to_arrow().schema,
location="/tmp/iceberg/table2"
)
# Function to add multiple commits to a table
def add_commits_to_table(table, table_name, num_commits=5):
print(f"Adding {num_commits} commits to {table_name}")
for i in range(num_commits):
df_batch = generate_df(batch_id=i)
table.append(df_batch.to_arrow())
print(f" Added commit {i+1} to {table_name}")
time.sleep(0.2) # Small delay between commits
# Add multiple commits to both tables
print("Creating multiple snapshots...")
add_commits_to_table(table1, "table1")
add_commits_to_table(table2, "table2")
# Function to expire oldest 3 snapshots in a thread
def expire_oldest_snapshots(table, table_name):
try:
# Get all snapshots
snapshots = list(table.snapshots())
if len(snapshots) <= 3:
print(f"{table_name}: Not enough snapshots to expire 3 (only
{len(snapshots)})")
return
# Find the oldest 3 snapshots
oldest_snapshots = snapshots[:3]
oldest_ids = [snapshot.snapshot_id for snapshot in oldest_snapshots]
print(f"{table_name}: Found {len(snapshots)} snapshots, expiring
oldest 3: {oldest_ids}")
# Expire the oldest 3 snapshots by IDs
for id in oldest_ids:
table.maintenance.expire_snapshots().by_id(id).commit()
# can also be replaced with:
# table.maintenance.expire_snapshots().by_ids(oldest_ids).commit()
print(f"{table_name}: Successfully expired snapshots {oldest_ids}")
except Exception as e:
print(f"{table_name}: Error expiring snapshots: {e}")
# Run expire_snapshots in parallel threads
print("\nRunning expire_snapshots in parallel threads...")
thread1 = threading.Thread(target=expire_oldest_snapshots, args=(table1,
"table1"))
thread2 = threading.Thread(target=expire_oldest_snapshots, args=(table2,
"table2"))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("\nDone!")
```
### Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [x] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]