kevinjqliu commented on issue #2325: URL: https://github.com/apache/iceberg-python/issues/2325#issuecomment-3193881084
Thanks for creating the issue. The repro script is super helpful. This is my current hypothesis: I think what you're observing is the side affect of the write pattern and its interaction with the `_manifests` LRU cache. `append` the dataframe here will always create a new manifest entry since we use the ["fast append"](https://iceberg.apache.org/spec/#snapshots:~:text=is%20called%20a%20%E2%80%9C-,fast%20append,-%E2%80%9D.) strategy. This means each write will generate a new manifest list. `_manifests`, with its LRU cache, will cache the new manifest list's path along with the tuple of `ManifestFile` objects. Each cached manifest list path will have overlapping `ManifestFile` objects since we're constantly adding new manifest files through append. This means in a LRU cache of size 128, we cache 128 manifest file paths. the first cached item will have 1 tuple of `ManifestFile`, and the 128th cached item will have 128 tuples of `ManifestFile`. Most of these `ManifestFile` are referring to the same underlying manifest file but in a python new object. ``` # cache key: value, str: tuple[ManifestFile] 1: (ManifestFile1) 2: (ManifestFile1, ManifestFile2) 3: (ManifestFile1, ManifestFile2, ManifestFile3) ... n: (ManifestFile1, ..., ManifestFilen-1, ManifestFilen) ``` Each of these `ManifestFile` objects will refer to the data read using the avro reader. This is the top allocation, even above the avro reader objects. I believe these are just `ManifestFile` objects ``` /Users/kevinliu/repos/iceberg-python/pyiceberg/typedef.py:183: size=1146 KiB, count=12750, average=92 B ``` Heres a modified repro script: ``` from pyiceberg.catalog.memory import InMemoryCatalog import tracemalloc from datetime import datetime, timezone import polars as pl def generate_df(): df = pl.DataFrame( { "event_type": ["playback"] * 1000, "event_origin": ["origin1"] * 1000, "event_send_at": [datetime.now(timezone.utc)] * 1000, "event_saved_at": [datetime.now(timezone.utc)] * 1000, "data": [ { "calendarKey": "calendarKey", "id": str(i), "referenceId": f"ref-{i}", } for i in range(1000) ], } ) return df df = generate_df() catalog = InMemoryCatalog("default", warehouse="/tmp/iceberg") catalog.create_namespace("default") df = generate_df() catalog = InMemoryCatalog("default", warehouse="/tmp/iceberg") catalog.create_namespace("default") table = iceberg_table = catalog.create_table( "default.leak", schema=df.to_arrow().schema, location="/tmp/iceberg/leak" ) df = pl.DataFrame() import gc, objgraph from pyiceberg.manifest import _manifests def get_max_value_size(manifest_cache): max_key, max_size = max( ((key, len(value)) for key, value in manifest_cache.items()), key=lambda item: item[1], default=(None, -1) ) return max_key, max_size tracemalloc.start() for i in range(1000): df = generate_df() df.write_iceberg(table, mode="append") # debug manifest_cache = _manifests.cache print("ManifestFile instances alive:", objgraph.count("ManifestFile")) print(f"size: {len(manifest_cache)}, keys: {len(manifest_cache)}, values: {len(manifest_cache)}") max_key, max_size = get_max_value_size(manifest_cache) print(f"Max size of the cache value tuple: {max_size}, key: {max_key}") # gc.collect() snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics("lineno") for stat in top_stats[:10]: print(stat) print() ``` Let me know if this makes sense. I'd like to hear what others think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
