kevinjqliu commented on code in PR #2951:
URL: https://github.com/apache/iceberg-python/pull/2951#discussion_r2725930300


##########
tests/utils/test_manifest.py:
##########
@@ -629,3 +639,268 @@ def test_file_format_case_insensitive(raw_file_format: 
str, expected_file_format
     else:
         with pytest.raises(ValueError):
             _ = FileFormat(raw_file_format)
+
+
+def test_manifest_cache_deduplicates_manifest_files() -> None:
+    """Test that the manifest cache deduplicates ManifestFile objects across 
manifest lists.
+
+    This test verifies the fix for 
https://github.com/apache/iceberg-python/issues/2325
+
+    The issue was that when caching manifest lists by their path, overlapping 
ManifestFile
+    objects were duplicated. For example:
+    - ManifestList1: (ManifestFile1)
+    - ManifestList2: (ManifestFile1, ManifestFile2)
+    - ManifestList3: (ManifestFile1, ManifestFile2, ManifestFile3)
+
+    With the old approach, ManifestFile1 was stored 3 times in the cache.
+    With the new approach, ManifestFile objects are cached individually by 
their
+    manifest_path, so ManifestFile1 is stored only once and reused.
+    """
+    io = PyArrowFileIO()
+
+    with TemporaryDirectory() as tmp_dir:
+        # Create three manifest files to simulate manifests created during 
appends
+        manifest1_path = f"{tmp_dir}/manifest1.avro"
+        manifest2_path = f"{tmp_dir}/manifest2.avro"
+        manifest3_path = f"{tmp_dir}/manifest3.avro"
+
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        # Create manifest file 1
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest1_path),
+            snapshot_id=1,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file1 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data1.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=100,
+                file_size_in_bytes=1000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=1,
+                    data_file=data_file1,
+                )
+            )
+        manifest_file1 = writer.to_manifest_file()
+
+        # Create manifest file 2
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest2_path),
+            snapshot_id=2,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file2 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data2.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=200,
+                file_size_in_bytes=2000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=2,
+                    data_file=data_file2,
+                )
+            )
+        manifest_file2 = writer.to_manifest_file()
+
+        # Create manifest file 3
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest3_path),
+            snapshot_id=3,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file3 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data3.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=300,
+                file_size_in_bytes=3000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=3,
+                    data_file=data_file3,
+                )
+            )
+        manifest_file3 = writer.to_manifest_file()
+
+        # Create manifest list 1: contains only manifest1
+        manifest_list1_path = f"{tmp_dir}/manifest-list1.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list1_path),
+            snapshot_id=1,
+            parent_snapshot_id=None,
+            sequence_number=1,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1])
+
+        # Create manifest list 2: contains manifest1 and manifest2 
(overlapping manifest1)
+        manifest_list2_path = f"{tmp_dir}/manifest-list2.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list2_path),
+            snapshot_id=2,
+            parent_snapshot_id=1,
+            sequence_number=2,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1, manifest_file2])
+
+        # Create manifest list 3: contains all three manifests (overlapping 
manifest1 and manifest2)
+        manifest_list3_path = f"{tmp_dir}/manifest-list3.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list3_path),
+            snapshot_id=3,
+            parent_snapshot_id=2,
+            sequence_number=3,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1, manifest_file2, 
manifest_file3])
+
+        # Clear the cache before testing
+        _manifest_cache.clear()
+
+        # Read all three manifest lists
+        manifests1 = _manifests(io, manifest_list1_path)
+        manifests2 = _manifests(io, manifest_list2_path)
+        manifests3 = _manifests(io, manifest_list3_path)
+
+        # Verify the manifest files have the expected paths
+        assert len(manifests1) == 1
+        assert len(manifests2) == 2
+        assert len(manifests3) == 3
+
+        # Verify that ManifestFile objects with the same manifest_path are the 
same object (identity)
+        # This is the key assertion - if caching works correctly, the same 
ManifestFile
+        # object should be reused instead of creating duplicates
+
+        # manifest_file1 appears in all three lists - should be the same object
+        assert manifests1[0] is manifests2[0], "ManifestFile1 should be the 
same object instance across manifest lists"
+        assert manifests2[0] is manifests3[0], "ManifestFile1 should be the 
same object instance across manifest lists"
+
+        # manifest_file2 appears in lists 2 and 3 - should be the same object
+        assert manifests2[1] is manifests3[1], "ManifestFile2 should be the 
same object instance across manifest lists"
+
+        # Verify cache size - should only have 3 unique ManifestFile objects
+        # instead of 1 + 2 + 3 = 6 objects as with the old approach
+        assert len(_manifest_cache) == 3, (
+            f"Cache should contain exactly 3 unique ManifestFile objects, but 
has {len(_manifest_cache)}"
+        )
+
+
+def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
+    """Test that the manifest cache remains efficient with many overlapping 
manifest lists.
+
+    This simulates the scenario from GitHub issue #2325 where many appends 
create
+    manifest lists that increasingly overlap.
+    """
+    io = PyArrowFileIO()
+
+    with TemporaryDirectory() as tmp_dir:
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        num_manifests = 10
+        manifest_files = []
+
+        # Create N manifest files
+        for i in range(num_manifests):
+            manifest_path = f"{tmp_dir}/manifest{i}.avro"
+            with write_manifest(
+                format_version=2,
+                spec=spec,
+                schema=schema,
+                output_file=io.new_output(manifest_path),
+                snapshot_id=i + 1,
+                avro_compression="zstandard",
+            ) as writer:
+                data_file = DataFile.from_args(
+                    content=DataFileContent.DATA,
+                    file_path=f"{tmp_dir}/data{i}.parquet",
+                    file_format=FileFormat.PARQUET,
+                    partition=Record(),
+                    record_count=100 * (i + 1),
+                    file_size_in_bytes=1000 * (i + 1),
+                )
+                writer.add_entry(
+                    ManifestEntry.from_args(
+                        status=ManifestEntryStatus.ADDED,
+                        snapshot_id=i + 1,
+                        data_file=data_file,
+                    )
+                )
+            manifest_files.append(writer.to_manifest_file())
+
+        # Create N manifest lists, each containing an increasing number of 
manifests
+        # list[i] contains manifests[0:i+1]
+        manifest_list_paths = []
+        for i in range(num_manifests):
+            list_path = f"{tmp_dir}/manifest-list{i}.avro"
+            with write_manifest_list(
+                format_version=2,
+                output_file=io.new_output(list_path),
+                snapshot_id=i + 1,
+                parent_snapshot_id=i if i > 0 else None,
+                sequence_number=i + 1,
+                avro_compression="zstandard",
+            ) as list_writer:
+                list_writer.add_manifests(manifest_files[: i + 1])
+            manifest_list_paths.append(list_path)

Review Comment:
   ignoring since this is in tests, we're only tests manifest cache behavior



##########
tests/benchmark/test_memory_benchmark.py:
##########
@@ -0,0 +1,281 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Memory benchmarks for manifest cache efficiency.
+
+These benchmarks reproduce the manifest cache memory issue described in:
+https://github.com/apache/iceberg-python/issues/2325
+
+The issue: When caching manifest lists as tuples, overlapping ManifestFile 
objects
+are duplicated across cache entries, causing O(N²) memory growth instead of 
O(N).
+
+Run with: uv run pytest tests/benchmark/test_memory_benchmark.py -v -s -m 
benchmark
+"""
+
+import gc
+import tracemalloc
+from datetime import datetime, timezone
+
+import pyarrow as pa
+import pytest
+
+from pyiceberg.catalog.memory import InMemoryCatalog
+from pyiceberg.manifest import _manifest_cache
+
+
+def generate_test_dataframe() -> pa.Table:
+    """Generate a PyArrow table for testing, similar to the issue's example."""
+    n_rows = 100  # Smaller for faster tests, increase for more realistic 
benchmarks
+
+    return pa.table(
+        {
+            "event_type": ["playback"] * n_rows,
+            "event_origin": ["origin1"] * n_rows,
+            "event_send_at": [datetime.now(timezone.utc)] * n_rows,
+            "event_saved_at": [datetime.now(timezone.utc)] * n_rows,
+            "id": list(range(n_rows)),
+            "reference_id": [f"ref-{i}" for i in range(n_rows)],
+        }
+    )
+
+
[email protected]
+def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> 
InMemoryCatalog:
+    """Create an in-memory catalog for memory testing."""
+    warehouse_path = str(tmp_path_factory.mktemp("warehouse"))
+    catalog = InMemoryCatalog("memory_test", 
warehouse=f"file://{warehouse_path}")
+    catalog.create_namespace("default")
+    return catalog
+
+
[email protected](autouse=True)
+def clear_caches() -> None:
+    """Clear caches before each test."""
+    _manifest_cache.clear()
+    gc.collect()
+
+
[email protected]
+def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
+    """Benchmark memory growth of manifest cache during repeated appends.
+
+    This test reproduces the issue from GitHub #2325 where each append creates
+    a new manifest list entry in the cache, causing memory to grow.
+
+    With the old caching strategy (tuple per manifest list), memory grew as 
O(N²).
+    With the new strategy (individual ManifestFile objects), memory grows as 
O(N).
+    """
+    df = generate_test_dataframe()
+    table = memory_catalog.create_table("default.memory_test", 
schema=df.schema)
+
+    tracemalloc.start()
+
+    num_iterations = 50
+    memory_samples: list[tuple[int, int, int]] = []  # (iteration, 
current_memory, cache_size)
+
+    print("\n--- Manifest Cache Memory Growth Benchmark ---")
+    print(f"Running {num_iterations} append operations...")
+
+    for i in range(num_iterations):
+        table.append(df)
+
+        # Sample memory at intervals
+        if (i + 1) % 10 == 0:
+            current, _ = tracemalloc.get_traced_memory()
+            cache_size = len(_manifest_cache)
+
+            memory_samples.append((i + 1, current, cache_size))
+            print(f"  Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache 
entries={cache_size}")
+
+    tracemalloc.stop()
+
+    # Analyze memory growth
+    if len(memory_samples) >= 2:
+        first_memory = memory_samples[0][1]
+        last_memory = memory_samples[-1][1]
+        memory_growth = last_memory - first_memory
+        growth_per_iteration = memory_growth / (memory_samples[-1][0] - 
memory_samples[0][0])
+
+        print("\nResults:")
+        print(f"  Initial memory: {first_memory / 1024:.1f} KB")
+        print(f"  Final memory: {last_memory / 1024:.1f} KB")
+        print(f"  Total growth: {memory_growth / 1024:.1f} KB")
+        print(f"  Growth per iteration: {growth_per_iteration:.1f} bytes")
+        print(f"  Final cache size: {memory_samples[-1][2]} entries")
+
+        # With efficient caching, growth should be roughly linear (O(N))
+        # rather than quadratic (O(N²)) as it was before
+        # Memory growth includes ManifestFile objects, metadata, and other 
overhead
+        # We expect about 5-10 KB per iteration for typical workloads
+        # The key improvement is that growth is O(N) not O(N²)
+        assert growth_per_iteration < 15000, (
+            f"Memory growth per iteration ({growth_per_iteration:.0f} bytes) 
is too high. "
+            "This may indicate the O(N²) cache inefficiency is present."
+        )
+
+
[email protected]
+def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) 
-> None:
+    """Test that clearing the cache allows memory to be reclaimed.
+
+    This test verifies that when we clear the manifest cache, the associated
+    memory can be garbage collected.
+    """
+    df = generate_test_dataframe()
+    table = memory_catalog.create_table("default.gc_test", schema=df.schema)
+
+    tracemalloc.start()
+
+    print("\n--- Memory After GC Benchmark ---")
+
+    # Phase 1: Fill the cache
+    print("Phase 1: Filling cache with 20 appends...")
+    for _ in range(20):
+        table.append(df)
+
+    gc.collect()
+    before_clear_memory, _ = tracemalloc.get_traced_memory()
+    cache_size_before = len(_manifest_cache)
+    print(f"  Memory before clear: {before_clear_memory / 1024:.1f} KB")
+    print(f"  Cache size: {cache_size_before}")
+
+    # Phase 2: Clear cache and GC
+    print("\nPhase 2: Clearing cache and running GC...")
+    _manifest_cache.clear()
+    gc.collect()
+    gc.collect()  # Multiple GC passes for thorough cleanup
+
+    after_clear_memory, _ = tracemalloc.get_traced_memory()
+    print(f"  Memory after clear: {after_clear_memory / 1024:.1f} KB")
+    print(f"  Memory reclaimed: {(before_clear_memory - after_clear_memory) / 
1024:.1f} KB")
+
+    tracemalloc.stop()
+
+    memory_reclaimed = before_clear_memory - after_clear_memory
+    print("\nResults:")
+    print(f"  Memory reclaimed by clearing cache: {memory_reclaimed / 
1024:.1f} KB")
+
+
[email protected]
+def test_manifest_cache_deduplication_efficiency() -> None:
+    """Benchmark the efficiency of the per-ManifestFile caching strategy.
+
+    This test verifies that when multiple manifest lists share the same
+    ManifestFile objects, they are properly deduplicated in the cache.
+    """
+    from tempfile import TemporaryDirectory
+
+    from pyiceberg.io.pyarrow import PyArrowFileIO
+    from pyiceberg.manifest import (
+        DataFile,
+        DataFileContent,
+        FileFormat,
+        ManifestEntry,
+        ManifestEntryStatus,
+        _manifests,
+        write_manifest,
+        write_manifest_list,
+    )
+    from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
+    from pyiceberg.schema import Schema
+    from pyiceberg.typedef import Record
+    from pyiceberg.types import IntegerType, NestedField
+
+    io = PyArrowFileIO()
+
+    print("\n--- Manifest Cache Deduplication Benchmark ---")
+
+    with TemporaryDirectory() as tmp_dir:
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        # Create N manifest files
+        num_manifests = 20
+        manifest_files = []
+
+        print(f"Creating {num_manifests} manifest files...")
+        for i in range(num_manifests):
+            manifest_path = f"{tmp_dir}/manifest_{i}.avro"
+            with write_manifest(
+                format_version=2,
+                spec=spec,
+                schema=schema,
+                output_file=io.new_output(manifest_path),
+                snapshot_id=i + 1,
+                avro_compression="null",
+            ) as writer:
+                data_file = DataFile.from_args(
+                    content=DataFileContent.DATA,
+                    file_path=f"{tmp_dir}/data_{i}.parquet",
+                    file_format=FileFormat.PARQUET,
+                    partition=Record(),
+                    record_count=100,
+                    file_size_in_bytes=1000,
+                )
+                writer.add_entry(
+                    ManifestEntry.from_args(
+                        status=ManifestEntryStatus.ADDED,
+                        snapshot_id=i + 1,
+                        data_file=data_file,
+                    )
+                )
+            manifest_files.append(writer.to_manifest_file())
+
+        # Create multiple manifest lists with overlapping manifest files
+        # List i contains manifest files 0 through i
+        num_lists = 10
+        print(f"Creating {num_lists} manifest lists with overlapping 
manifests...")
+
+        _manifest_cache.clear()
+
+        for i in range(num_lists):
+            list_path = f"{tmp_dir}/manifest-list_{i}.avro"
+            manifests_to_include = manifest_files[: i + 1]
+
+            with write_manifest_list(
+                format_version=2,
+                output_file=io.new_output(list_path),
+                snapshot_id=i + 1,
+                parent_snapshot_id=i if i > 0 else None,
+                sequence_number=i + 1,
+                avro_compression="null",
+            ) as list_writer:
+                list_writer.add_manifests(manifests_to_include)
+
+            # Read the manifest list using _manifests (this populates the 
cache)
+            _manifests(io, list_path)
+

Review Comment:
   ignoring since this is in tests, we're only tests manifest cache behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to