kevinjqliu commented on issue #2325:
URL: 
https://github.com/apache/iceberg-python/issues/2325#issuecomment-3193881084

   Thanks for creating the issue. The repro script is super helpful. 
   
   This is my current hypothesis: 
   I think what you're observing is the side affect of the write pattern and 
its interaction with the `_manifests` LRU cache. 
   
   `append` the dataframe here will always create a new manifest entry since we 
use the ["fast 
append"](https://iceberg.apache.org/spec/#snapshots:~:text=is%20called%20a%20%E2%80%9C-,fast%20append,-%E2%80%9D.)
 strategy. 
   This means each write will generate a new manifest list. `_manifests`, with 
its LRU cache, will cache the new manifest list's path along with the tuple of 
`ManifestFile` objects. 
   Each cached manifest list path will have overlapping `ManifestFile` objects 
since we're constantly adding new manifest files through append. 
   This means in a LRU cache of size 128, we cache 128 manifest file paths. the 
first cached item will have 1 tuple of `ManifestFile`, and the 128th cached 
item will have 128 tuples of `ManifestFile`. Most of these `ManifestFile` are 
referring to the same underlying manifest file but in a python new object.
   ```
   # cache key: value, str: tuple[ManifestFile]
   1: (ManifestFile1)
   2: (ManifestFile1, ManifestFile2)
   3: (ManifestFile1, ManifestFile2, ManifestFile3)
   ...
   n: (ManifestFile1, ..., ManifestFilen-1, ManifestFilen)
   ```
   
   Each of these `ManifestFile` objects will refer to the data read using the 
avro reader. 
   This is the top allocation, even above the avro reader objects. I believe 
these are just `ManifestFile` objects
   ```
   /Users/kevinliu/repos/iceberg-python/pyiceberg/typedef.py:183: size=1146 
KiB, count=12750, average=92 B
   ```
   
   Heres a modified repro script:
   ```
   from pyiceberg.catalog.memory import InMemoryCatalog
   import tracemalloc
   from datetime import datetime, timezone
   import polars as pl
   
   def generate_df():
       df = pl.DataFrame(
           {
               "event_type": ["playback"] * 1000,
               "event_origin": ["origin1"] * 1000,
               "event_send_at": [datetime.now(timezone.utc)] * 1000,
               "event_saved_at": [datetime.now(timezone.utc)] * 1000,
               "data": [
                   {
                       "calendarKey": "calendarKey",
                       "id": str(i),
                       "referenceId": f"ref-{i}",
                   }
                   for i in range(1000)
               ],
           }
       )
       return df
   
   df = generate_df()
   catalog = InMemoryCatalog("default", warehouse="/tmp/iceberg")
   catalog.create_namespace("default")
   
   df = generate_df()
   catalog = InMemoryCatalog("default", warehouse="/tmp/iceberg")
   catalog.create_namespace("default")
   table = iceberg_table = catalog.create_table(
       "default.leak", schema=df.to_arrow().schema, location="/tmp/iceberg/leak"
   )
   
   df = pl.DataFrame()
   import gc, objgraph
   
   from pyiceberg.manifest import _manifests
   
   def get_max_value_size(manifest_cache):
       max_key, max_size = max(
           ((key, len(value)) for key, value in manifest_cache.items()),
           key=lambda item: item[1],
           default=(None, -1)
       )
       return max_key, max_size
   
   
   tracemalloc.start()
   for i in range(1000):
       df = generate_df()
       df.write_iceberg(table, mode="append")
   
       # debug
       manifest_cache = _manifests.cache
       print("ManifestFile instances alive:", objgraph.count("ManifestFile"))
       print(f"size: {len(manifest_cache)}, keys: {len(manifest_cache)}, 
values: {len(manifest_cache)}")
       max_key, max_size = get_max_value_size(manifest_cache)
       print(f"Max size of the cache value tuple: {max_size}, key: {max_key}")
       # gc.collect()
       snapshot = tracemalloc.take_snapshot()
       top_stats = snapshot.statistics("lineno")
       for stat in top_stats[:10]:
           print(stat)
   
       print()
   ```
   
   Let me know if this makes sense. I'd like to hear what others think 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to