Copilot commented on code in PR #2951:
URL: https://github.com/apache/iceberg-python/pull/2951#discussion_r2725915886


##########
tests/utils/test_manifest.py:
##########
@@ -629,3 +639,268 @@ def test_file_format_case_insensitive(raw_file_format: 
str, expected_file_format
     else:
         with pytest.raises(ValueError):
             _ = FileFormat(raw_file_format)
+
+
+def test_manifest_cache_deduplicates_manifest_files() -> None:
+    """Test that the manifest cache deduplicates ManifestFile objects across 
manifest lists.
+
+    This test verifies the fix for 
https://github.com/apache/iceberg-python/issues/2325
+
+    The issue was that when caching manifest lists by their path, overlapping 
ManifestFile
+    objects were duplicated. For example:
+    - ManifestList1: (ManifestFile1)
+    - ManifestList2: (ManifestFile1, ManifestFile2)
+    - ManifestList3: (ManifestFile1, ManifestFile2, ManifestFile3)
+
+    With the old approach, ManifestFile1 was stored 3 times in the cache.
+    With the new approach, ManifestFile objects are cached individually by 
their
+    manifest_path, so ManifestFile1 is stored only once and reused.
+    """
+    io = PyArrowFileIO()
+
+    with TemporaryDirectory() as tmp_dir:
+        # Create three manifest files to simulate manifests created during 
appends
+        manifest1_path = f"{tmp_dir}/manifest1.avro"
+        manifest2_path = f"{tmp_dir}/manifest2.avro"
+        manifest3_path = f"{tmp_dir}/manifest3.avro"
+
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        # Create manifest file 1
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest1_path),
+            snapshot_id=1,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file1 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data1.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=100,
+                file_size_in_bytes=1000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=1,
+                    data_file=data_file1,
+                )
+            )
+        manifest_file1 = writer.to_manifest_file()
+
+        # Create manifest file 2
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest2_path),
+            snapshot_id=2,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file2 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data2.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=200,
+                file_size_in_bytes=2000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=2,
+                    data_file=data_file2,
+                )
+            )
+        manifest_file2 = writer.to_manifest_file()
+
+        # Create manifest file 3
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest3_path),
+            snapshot_id=3,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file3 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data3.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=300,
+                file_size_in_bytes=3000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=3,
+                    data_file=data_file3,
+                )
+            )
+        manifest_file3 = writer.to_manifest_file()
+
+        # Create manifest list 1: contains only manifest1
+        manifest_list1_path = f"{tmp_dir}/manifest-list1.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list1_path),
+            snapshot_id=1,
+            parent_snapshot_id=None,
+            sequence_number=1,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1])
+
+        # Create manifest list 2: contains manifest1 and manifest2 
(overlapping manifest1)
+        manifest_list2_path = f"{tmp_dir}/manifest-list2.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list2_path),
+            snapshot_id=2,
+            parent_snapshot_id=1,
+            sequence_number=2,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1, manifest_file2])
+
+        # Create manifest list 3: contains all three manifests (overlapping 
manifest1 and manifest2)
+        manifest_list3_path = f"{tmp_dir}/manifest-list3.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list3_path),
+            snapshot_id=3,
+            parent_snapshot_id=2,
+            sequence_number=3,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1, manifest_file2, 
manifest_file3])
+
+        # Clear the cache before testing
+        _manifest_cache.clear()
+
+        # Read all three manifest lists
+        manifests1 = _manifests(io, manifest_list1_path)
+        manifests2 = _manifests(io, manifest_list2_path)
+        manifests3 = _manifests(io, manifest_list3_path)
+
+        # Verify the manifest files have the expected paths
+        assert len(manifests1) == 1
+        assert len(manifests2) == 2
+        assert len(manifests3) == 3
+
+        # Verify that ManifestFile objects with the same manifest_path are the 
same object (identity)
+        # This is the key assertion - if caching works correctly, the same 
ManifestFile
+        # object should be reused instead of creating duplicates
+
+        # manifest_file1 appears in all three lists - should be the same object
+        assert manifests1[0] is manifests2[0], "ManifestFile1 should be the 
same object instance across manifest lists"
+        assert manifests2[0] is manifests3[0], "ManifestFile1 should be the 
same object instance across manifest lists"
+
+        # manifest_file2 appears in lists 2 and 3 - should be the same object
+        assert manifests2[1] is manifests3[1], "ManifestFile2 should be the 
same object instance across manifest lists"
+
+        # Verify cache size - should only have 3 unique ManifestFile objects
+        # instead of 1 + 2 + 3 = 6 objects as with the old approach
+        assert len(_manifest_cache) == 3, (
+            f"Cache should contain exactly 3 unique ManifestFile objects, but 
has {len(_manifest_cache)}"
+        )
+
+
+def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
+    """Test that the manifest cache remains efficient with many overlapping 
manifest lists.
+
+    This simulates the scenario from GitHub issue #2325 where many appends 
create
+    manifest lists that increasingly overlap.
+    """
+    io = PyArrowFileIO()
+
+    with TemporaryDirectory() as tmp_dir:
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        num_manifests = 10
+        manifest_files = []
+
+        # Create N manifest files
+        for i in range(num_manifests):
+            manifest_path = f"{tmp_dir}/manifest{i}.avro"
+            with write_manifest(
+                format_version=2,
+                spec=spec,
+                schema=schema,
+                output_file=io.new_output(manifest_path),
+                snapshot_id=i + 1,
+                avro_compression="zstandard",
+            ) as writer:
+                data_file = DataFile.from_args(
+                    content=DataFileContent.DATA,
+                    file_path=f"{tmp_dir}/data{i}.parquet",
+                    file_format=FileFormat.PARQUET,
+                    partition=Record(),
+                    record_count=100 * (i + 1),
+                    file_size_in_bytes=1000 * (i + 1),
+                )
+                writer.add_entry(
+                    ManifestEntry.from_args(
+                        status=ManifestEntryStatus.ADDED,
+                        snapshot_id=i + 1,
+                        data_file=data_file,
+                    )
+                )
+            manifest_files.append(writer.to_manifest_file())
+
+        # Create N manifest lists, each containing an increasing number of 
manifests
+        # list[i] contains manifests[0:i+1]
+        manifest_list_paths = []
+        for i in range(num_manifests):
+            list_path = f"{tmp_dir}/manifest-list{i}.avro"
+            with write_manifest_list(
+                format_version=2,
+                output_file=io.new_output(list_path),
+                snapshot_id=i + 1,
+                parent_snapshot_id=i if i > 0 else None,
+                sequence_number=i + 1,
+                avro_compression="zstandard",
+            ) as list_writer:
+                list_writer.add_manifests(manifest_files[: i + 1])
+            manifest_list_paths.append(list_path)
+
+        # Clear the cache
+        _manifest_cache.clear()
+
+        # Read all manifest lists
+        all_results = []
+        for path in manifest_list_paths:
+            result = _manifests(io, path)
+            all_results.append(result)
+
+        # With the old cache approach, we would have:
+        # 1 + 2 + 3 + ... + N = N*(N+1)/2 ManifestFile objects in memory
+        # With the new approach, we should have exactly N objects
+
+        # Verify cache has exactly N unique entries
+        assert len(_manifest_cache) == num_manifests, (
+            f"Cache should contain exactly {num_manifests} ManifestFile 
objects, "
+            f"but has {len(_manifest_cache)}. "
+            f"Old approach would have {num_manifests * (num_manifests + 1) // 
2} objects."
+        )
+
+        # Verify object identity - all references to the same manifest should 
be the same object
+        for i in range(num_manifests):
+            manifest_path = manifest_files[i].manifest_path

Review Comment:
   Variable manifest_path is not used.
   ```suggestion
   
   ```



##########
tests/utils/test_manifest.py:
##########
@@ -629,3 +639,268 @@ def test_file_format_case_insensitive(raw_file_format: 
str, expected_file_format
     else:
         with pytest.raises(ValueError):
             _ = FileFormat(raw_file_format)
+
+
+def test_manifest_cache_deduplicates_manifest_files() -> None:
+    """Test that the manifest cache deduplicates ManifestFile objects across 
manifest lists.
+
+    This test verifies the fix for 
https://github.com/apache/iceberg-python/issues/2325
+
+    The issue was that when caching manifest lists by their path, overlapping 
ManifestFile
+    objects were duplicated. For example:
+    - ManifestList1: (ManifestFile1)
+    - ManifestList2: (ManifestFile1, ManifestFile2)
+    - ManifestList3: (ManifestFile1, ManifestFile2, ManifestFile3)
+
+    With the old approach, ManifestFile1 was stored 3 times in the cache.
+    With the new approach, ManifestFile objects are cached individually by 
their
+    manifest_path, so ManifestFile1 is stored only once and reused.
+    """
+    io = PyArrowFileIO()
+
+    with TemporaryDirectory() as tmp_dir:
+        # Create three manifest files to simulate manifests created during 
appends
+        manifest1_path = f"{tmp_dir}/manifest1.avro"
+        manifest2_path = f"{tmp_dir}/manifest2.avro"
+        manifest3_path = f"{tmp_dir}/manifest3.avro"
+
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        # Create manifest file 1
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest1_path),
+            snapshot_id=1,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file1 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data1.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=100,
+                file_size_in_bytes=1000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=1,
+                    data_file=data_file1,
+                )
+            )
+        manifest_file1 = writer.to_manifest_file()
+

Review Comment:
   `writer.to_manifest_file()` is being called after the `write_manifest(...)` 
context manager has exited. Most existing tests in this file call 
`to_manifest_file()` inside the `with` block; calling it after `__exit__` 
relies on writer internals still being usable post-close and can become 
fragile. Consider moving `to_manifest_file()` inside the `with` block (before 
exit) for consistency and robustness.



##########
pyiceberg/manifest.py:
##########
@@ -892,15 +891,52 @@ def __hash__(self) -> int:
         return hash(self.manifest_path)
 
 
-# Global cache for manifest lists
-_manifest_cache: LRUCache[Any, tuple[ManifestFile, ...]] = 
LRUCache(maxsize=128)
+# Global cache for individual ManifestFile objects, keyed by manifest_path.
+# This avoids duplicating ManifestFile objects when multiple manifest lists
+# share the same manifests (which is common after appends).
+_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=512)
+
+# Lock for thread-safe cache access
+_manifest_cache_lock = threading.RLock()
 
 
-@cached(cache=_manifest_cache, key=lambda io, manifest_list: 
hashkey(manifest_list), lock=threading.RLock())
 def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
-    """Read and cache manifests from the given manifest list, returning a 
tuple to prevent modification."""
+    """Read manifests from the given manifest list, caching individual 
ManifestFile objects.
+
+    Unlike caching entire manifest lists, this approach caches individual 
ManifestFile
+    objects by their manifest_path. This is more memory-efficient because:
+    - ManifestList1 contains: (ManifestFile1)
+    - ManifestList2 contains: (ManifestFile1, ManifestFile2)
+    - ManifestList3 contains: (ManifestFile1, ManifestFile2, ManifestFile3)
+
+    With per-ManifestFile caching, ManifestFile1 is stored only once and 
reused,
+    instead of being duplicated in each manifest list's cached tuple.
+
+    Args:
+        io: The FileIO to read the manifest list.
+        manifest_list: The path to the manifest list file.
+
+    Returns:
+        A tuple of ManifestFile objects (tuple to prevent modification).
+    """
+    # Read manifest list outside the lock to avoid blocking other threads 
during I/O
     file = io.new_input(manifest_list)
-    return tuple(read_manifest_list(file))
+    manifest_files = list(read_manifest_list(file))
+

Review Comment:
   `_manifests` no longer caches results per `manifest_list` path (the previous 
`@cached(...)` decorator was removed), so repeated calls will re-read the 
manifest list file each time. If callers invoke `Snapshot.manifests(io)` 
multiple times, this is an I/O regression compared to the prior behavior. 
Consider adding a small separate cache keyed by `manifest_list` (e.g., to a 
tuple of manifest paths or to a tuple of the already-deduped cached 
`ManifestFile` objects) so you keep per-manifest deduplication without 
re-reading the same manifest list repeatedly.



##########
tests/utils/test_manifest.py:
##########
@@ -629,3 +639,268 @@ def test_file_format_case_insensitive(raw_file_format: 
str, expected_file_format
     else:
         with pytest.raises(ValueError):
             _ = FileFormat(raw_file_format)
+
+
+def test_manifest_cache_deduplicates_manifest_files() -> None:
+    """Test that the manifest cache deduplicates ManifestFile objects across 
manifest lists.
+
+    This test verifies the fix for 
https://github.com/apache/iceberg-python/issues/2325
+
+    The issue was that when caching manifest lists by their path, overlapping 
ManifestFile
+    objects were duplicated. For example:
+    - ManifestList1: (ManifestFile1)
+    - ManifestList2: (ManifestFile1, ManifestFile2)
+    - ManifestList3: (ManifestFile1, ManifestFile2, ManifestFile3)
+
+    With the old approach, ManifestFile1 was stored 3 times in the cache.
+    With the new approach, ManifestFile objects are cached individually by 
their
+    manifest_path, so ManifestFile1 is stored only once and reused.
+    """
+    io = PyArrowFileIO()
+
+    with TemporaryDirectory() as tmp_dir:
+        # Create three manifest files to simulate manifests created during 
appends
+        manifest1_path = f"{tmp_dir}/manifest1.avro"
+        manifest2_path = f"{tmp_dir}/manifest2.avro"
+        manifest3_path = f"{tmp_dir}/manifest3.avro"
+
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        # Create manifest file 1
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest1_path),
+            snapshot_id=1,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file1 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data1.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=100,
+                file_size_in_bytes=1000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=1,
+                    data_file=data_file1,
+                )
+            )
+        manifest_file1 = writer.to_manifest_file()
+
+        # Create manifest file 2
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest2_path),
+            snapshot_id=2,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file2 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data2.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=200,
+                file_size_in_bytes=2000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=2,
+                    data_file=data_file2,
+                )
+            )
+        manifest_file2 = writer.to_manifest_file()
+
+        # Create manifest file 3
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest3_path),
+            snapshot_id=3,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file3 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data3.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=300,
+                file_size_in_bytes=3000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=3,
+                    data_file=data_file3,
+                )
+            )
+        manifest_file3 = writer.to_manifest_file()
+
+        # Create manifest list 1: contains only manifest1
+        manifest_list1_path = f"{tmp_dir}/manifest-list1.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list1_path),
+            snapshot_id=1,
+            parent_snapshot_id=None,
+            sequence_number=1,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1])
+
+        # Create manifest list 2: contains manifest1 and manifest2 
(overlapping manifest1)
+        manifest_list2_path = f"{tmp_dir}/manifest-list2.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list2_path),
+            snapshot_id=2,
+            parent_snapshot_id=1,
+            sequence_number=2,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1, manifest_file2])
+

Review Comment:
   In this test, `manifest_file1` is produced via `writer.to_manifest_file()` 
which sets `sequence_number`/`min_sequence_number` to `UNASSIGNED_SEQ`. Reusing 
it in a later snapshot’s `write_manifest_list` (snapshot_id=2) will raise in 
`ManifestListWriterV2.prepare_manifest` because `added_snapshot_id` (1) != 
`commit_snapshot_id` (2). To model real append behavior, either (a) read 
`manifest_file1` back from the manifest list written for snapshot 1 (so 
sequence numbers are assigned), or (b) explicitly assign valid sequence numbers 
before including it in subsequent manifest lists.



##########
tests/benchmark/test_memory_benchmark.py:
##########
@@ -0,0 +1,281 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Memory benchmarks for manifest cache efficiency.
+
+These benchmarks reproduce the manifest cache memory issue described in:
+https://github.com/apache/iceberg-python/issues/2325
+
+The issue: When caching manifest lists as tuples, overlapping ManifestFile 
objects
+are duplicated across cache entries, causing O(N²) memory growth instead of 
O(N).
+
+Run with: uv run pytest tests/benchmark/test_memory_benchmark.py -v -s -m 
benchmark
+"""
+
+import gc
+import tracemalloc
+from datetime import datetime, timezone
+
+import pyarrow as pa
+import pytest
+
+from pyiceberg.catalog.memory import InMemoryCatalog
+from pyiceberg.manifest import _manifest_cache
+
+
+def generate_test_dataframe() -> pa.Table:
+    """Generate a PyArrow table for testing, similar to the issue's example."""
+    n_rows = 100  # Smaller for faster tests, increase for more realistic 
benchmarks
+
+    return pa.table(
+        {
+            "event_type": ["playback"] * n_rows,
+            "event_origin": ["origin1"] * n_rows,
+            "event_send_at": [datetime.now(timezone.utc)] * n_rows,
+            "event_saved_at": [datetime.now(timezone.utc)] * n_rows,
+            "id": list(range(n_rows)),
+            "reference_id": [f"ref-{i}" for i in range(n_rows)],
+        }
+    )
+
+
[email protected]
+def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> 
InMemoryCatalog:
+    """Create an in-memory catalog for memory testing."""
+    warehouse_path = str(tmp_path_factory.mktemp("warehouse"))
+    catalog = InMemoryCatalog("memory_test", 
warehouse=f"file://{warehouse_path}")
+    catalog.create_namespace("default")
+    return catalog
+
+
[email protected](autouse=True)
+def clear_caches() -> None:
+    """Clear caches before each test."""
+    _manifest_cache.clear()
+    gc.collect()
+
+
[email protected]
+def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
+    """Benchmark memory growth of manifest cache during repeated appends.
+
+    This test reproduces the issue from GitHub #2325 where each append creates
+    a new manifest list entry in the cache, causing memory to grow.
+
+    With the old caching strategy (tuple per manifest list), memory grew as 
O(N²).
+    With the new strategy (individual ManifestFile objects), memory grows as 
O(N).
+    """
+    df = generate_test_dataframe()
+    table = memory_catalog.create_table("default.memory_test", 
schema=df.schema)
+
+    tracemalloc.start()
+
+    num_iterations = 50
+    memory_samples: list[tuple[int, int, int]] = []  # (iteration, 
current_memory, cache_size)
+
+    print("\n--- Manifest Cache Memory Growth Benchmark ---")
+    print(f"Running {num_iterations} append operations...")
+
+    for i in range(num_iterations):
+        table.append(df)
+
+        # Sample memory at intervals
+        if (i + 1) % 10 == 0:
+            current, _ = tracemalloc.get_traced_memory()
+            cache_size = len(_manifest_cache)
+
+            memory_samples.append((i + 1, current, cache_size))
+            print(f"  Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache 
entries={cache_size}")
+
+    tracemalloc.stop()
+
+    # Analyze memory growth
+    if len(memory_samples) >= 2:
+        first_memory = memory_samples[0][1]
+        last_memory = memory_samples[-1][1]
+        memory_growth = last_memory - first_memory
+        growth_per_iteration = memory_growth / (memory_samples[-1][0] - 
memory_samples[0][0])
+
+        print("\nResults:")
+        print(f"  Initial memory: {first_memory / 1024:.1f} KB")
+        print(f"  Final memory: {last_memory / 1024:.1f} KB")
+        print(f"  Total growth: {memory_growth / 1024:.1f} KB")
+        print(f"  Growth per iteration: {growth_per_iteration:.1f} bytes")
+        print(f"  Final cache size: {memory_samples[-1][2]} entries")
+
+        # With efficient caching, growth should be roughly linear (O(N))
+        # rather than quadratic (O(N²)) as it was before
+        # Memory growth includes ManifestFile objects, metadata, and other 
overhead
+        # We expect about 5-10 KB per iteration for typical workloads
+        # The key improvement is that growth is O(N) not O(N²)
+        assert growth_per_iteration < 15000, (
+            f"Memory growth per iteration ({growth_per_iteration:.0f} bytes) 
is too high. "
+            "This may indicate the O(N²) cache inefficiency is present."
+        )
+
+
[email protected]
+def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) 
-> None:
+    """Test that clearing the cache allows memory to be reclaimed.
+
+    This test verifies that when we clear the manifest cache, the associated
+    memory can be garbage collected.
+    """
+    df = generate_test_dataframe()
+    table = memory_catalog.create_table("default.gc_test", schema=df.schema)
+
+    tracemalloc.start()
+
+    print("\n--- Memory After GC Benchmark ---")
+
+    # Phase 1: Fill the cache
+    print("Phase 1: Filling cache with 20 appends...")
+    for _ in range(20):
+        table.append(df)
+
+    gc.collect()
+    before_clear_memory, _ = tracemalloc.get_traced_memory()
+    cache_size_before = len(_manifest_cache)
+    print(f"  Memory before clear: {before_clear_memory / 1024:.1f} KB")
+    print(f"  Cache size: {cache_size_before}")
+
+    # Phase 2: Clear cache and GC
+    print("\nPhase 2: Clearing cache and running GC...")
+    _manifest_cache.clear()
+    gc.collect()
+    gc.collect()  # Multiple GC passes for thorough cleanup
+
+    after_clear_memory, _ = tracemalloc.get_traced_memory()
+    print(f"  Memory after clear: {after_clear_memory / 1024:.1f} KB")
+    print(f"  Memory reclaimed: {(before_clear_memory - after_clear_memory) / 
1024:.1f} KB")
+
+    tracemalloc.stop()
+
+    memory_reclaimed = before_clear_memory - after_clear_memory
+    print("\nResults:")
+    print(f"  Memory reclaimed by clearing cache: {memory_reclaimed / 
1024:.1f} KB")
+
+
[email protected]
+def test_manifest_cache_deduplication_efficiency() -> None:
+    """Benchmark the efficiency of the per-ManifestFile caching strategy.
+
+    This test verifies that when multiple manifest lists share the same
+    ManifestFile objects, they are properly deduplicated in the cache.
+    """
+    from tempfile import TemporaryDirectory
+
+    from pyiceberg.io.pyarrow import PyArrowFileIO
+    from pyiceberg.manifest import (
+        DataFile,
+        DataFileContent,
+        FileFormat,
+        ManifestEntry,
+        ManifestEntryStatus,
+        _manifests,
+        write_manifest,
+        write_manifest_list,
+    )
+    from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
+    from pyiceberg.schema import Schema
+    from pyiceberg.typedef import Record
+    from pyiceberg.types import IntegerType, NestedField
+
+    io = PyArrowFileIO()
+
+    print("\n--- Manifest Cache Deduplication Benchmark ---")
+
+    with TemporaryDirectory() as tmp_dir:
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        # Create N manifest files
+        num_manifests = 20
+        manifest_files = []
+
+        print(f"Creating {num_manifests} manifest files...")
+        for i in range(num_manifests):
+            manifest_path = f"{tmp_dir}/manifest_{i}.avro"
+            with write_manifest(
+                format_version=2,
+                spec=spec,
+                schema=schema,
+                output_file=io.new_output(manifest_path),
+                snapshot_id=i + 1,
+                avro_compression="null",
+            ) as writer:
+                data_file = DataFile.from_args(
+                    content=DataFileContent.DATA,
+                    file_path=f"{tmp_dir}/data_{i}.parquet",
+                    file_format=FileFormat.PARQUET,
+                    partition=Record(),
+                    record_count=100,
+                    file_size_in_bytes=1000,
+                )
+                writer.add_entry(
+                    ManifestEntry.from_args(
+                        status=ManifestEntryStatus.ADDED,
+                        snapshot_id=i + 1,
+                        data_file=data_file,
+                    )
+                )
+            manifest_files.append(writer.to_manifest_file())
+
+        # Create multiple manifest lists with overlapping manifest files
+        # List i contains manifest files 0 through i
+        num_lists = 10
+        print(f"Creating {num_lists} manifest lists with overlapping 
manifests...")
+
+        _manifest_cache.clear()
+
+        for i in range(num_lists):
+            list_path = f"{tmp_dir}/manifest-list_{i}.avro"
+            manifests_to_include = manifest_files[: i + 1]
+
+            with write_manifest_list(
+                format_version=2,
+                output_file=io.new_output(list_path),
+                snapshot_id=i + 1,
+                parent_snapshot_id=i if i > 0 else None,
+                sequence_number=i + 1,
+                avro_compression="null",
+            ) as list_writer:
+                list_writer.add_manifests(manifests_to_include)
+
+            # Read the manifest list using _manifests (this populates the 
cache)
+            _manifests(io, list_path)
+

Review Comment:
   This benchmark creates v2 manifest lists that include manifests from earlier 
snapshots, but those `manifest_files` come from `writer.to_manifest_file()` 
(sequence numbers left as `UNASSIGNED_SEQ`). `write_manifest_list` will raise 
in `ManifestListWriterV2.prepare_manifest` when it encounters an “existing” 
manifest whose `added_snapshot_id` differs from the list’s `snapshot_id`. To 
keep the benchmark runnable, use manifests read back from the prior manifest 
list (so sequence numbers are set) or assign appropriate sequence numbers for 
existing manifests before adding them to later manifest lists.



##########
tests/utils/test_manifest.py:
##########
@@ -629,3 +639,268 @@ def test_file_format_case_insensitive(raw_file_format: 
str, expected_file_format
     else:
         with pytest.raises(ValueError):
             _ = FileFormat(raw_file_format)
+
+
+def test_manifest_cache_deduplicates_manifest_files() -> None:
+    """Test that the manifest cache deduplicates ManifestFile objects across 
manifest lists.
+
+    This test verifies the fix for 
https://github.com/apache/iceberg-python/issues/2325
+
+    The issue was that when caching manifest lists by their path, overlapping 
ManifestFile
+    objects were duplicated. For example:
+    - ManifestList1: (ManifestFile1)
+    - ManifestList2: (ManifestFile1, ManifestFile2)
+    - ManifestList3: (ManifestFile1, ManifestFile2, ManifestFile3)
+
+    With the old approach, ManifestFile1 was stored 3 times in the cache.
+    With the new approach, ManifestFile objects are cached individually by 
their
+    manifest_path, so ManifestFile1 is stored only once and reused.
+    """
+    io = PyArrowFileIO()
+
+    with TemporaryDirectory() as tmp_dir:
+        # Create three manifest files to simulate manifests created during 
appends
+        manifest1_path = f"{tmp_dir}/manifest1.avro"
+        manifest2_path = f"{tmp_dir}/manifest2.avro"
+        manifest3_path = f"{tmp_dir}/manifest3.avro"
+
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        # Create manifest file 1
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest1_path),
+            snapshot_id=1,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file1 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data1.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=100,
+                file_size_in_bytes=1000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=1,
+                    data_file=data_file1,
+                )
+            )
+        manifest_file1 = writer.to_manifest_file()
+
+        # Create manifest file 2
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest2_path),
+            snapshot_id=2,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file2 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data2.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=200,
+                file_size_in_bytes=2000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=2,
+                    data_file=data_file2,
+                )
+            )
+        manifest_file2 = writer.to_manifest_file()
+
+        # Create manifest file 3
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest3_path),
+            snapshot_id=3,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file3 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data3.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=300,
+                file_size_in_bytes=3000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=3,
+                    data_file=data_file3,
+                )
+            )
+        manifest_file3 = writer.to_manifest_file()
+
+        # Create manifest list 1: contains only manifest1
+        manifest_list1_path = f"{tmp_dir}/manifest-list1.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list1_path),
+            snapshot_id=1,
+            parent_snapshot_id=None,
+            sequence_number=1,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1])
+
+        # Create manifest list 2: contains manifest1 and manifest2 
(overlapping manifest1)
+        manifest_list2_path = f"{tmp_dir}/manifest-list2.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list2_path),
+            snapshot_id=2,
+            parent_snapshot_id=1,
+            sequence_number=2,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1, manifest_file2])
+
+        # Create manifest list 3: contains all three manifests (overlapping 
manifest1 and manifest2)
+        manifest_list3_path = f"{tmp_dir}/manifest-list3.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list3_path),
+            snapshot_id=3,
+            parent_snapshot_id=2,
+            sequence_number=3,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1, manifest_file2, 
manifest_file3])
+
+        # Clear the cache before testing
+        _manifest_cache.clear()
+
+        # Read all three manifest lists
+        manifests1 = _manifests(io, manifest_list1_path)
+        manifests2 = _manifests(io, manifest_list2_path)
+        manifests3 = _manifests(io, manifest_list3_path)
+
+        # Verify the manifest files have the expected paths
+        assert len(manifests1) == 1
+        assert len(manifests2) == 2
+        assert len(manifests3) == 3
+
+        # Verify that ManifestFile objects with the same manifest_path are the 
same object (identity)
+        # This is the key assertion - if caching works correctly, the same 
ManifestFile
+        # object should be reused instead of creating duplicates
+
+        # manifest_file1 appears in all three lists - should be the same object
+        assert manifests1[0] is manifests2[0], "ManifestFile1 should be the 
same object instance across manifest lists"
+        assert manifests2[0] is manifests3[0], "ManifestFile1 should be the 
same object instance across manifest lists"
+
+        # manifest_file2 appears in lists 2 and 3 - should be the same object
+        assert manifests2[1] is manifests3[1], "ManifestFile2 should be the 
same object instance across manifest lists"
+
+        # Verify cache size - should only have 3 unique ManifestFile objects
+        # instead of 1 + 2 + 3 = 6 objects as with the old approach
+        assert len(_manifest_cache) == 3, (
+            f"Cache should contain exactly 3 unique ManifestFile objects, but 
has {len(_manifest_cache)}"
+        )
+
+
+def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
+    """Test that the manifest cache remains efficient with many overlapping 
manifest lists.
+
+    This simulates the scenario from GitHub issue #2325 where many appends 
create
+    manifest lists that increasingly overlap.
+    """
+    io = PyArrowFileIO()
+
+    with TemporaryDirectory() as tmp_dir:
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        num_manifests = 10
+        manifest_files = []
+
+        # Create N manifest files
+        for i in range(num_manifests):
+            manifest_path = f"{tmp_dir}/manifest{i}.avro"
+            with write_manifest(
+                format_version=2,
+                spec=spec,
+                schema=schema,
+                output_file=io.new_output(manifest_path),
+                snapshot_id=i + 1,
+                avro_compression="zstandard",
+            ) as writer:
+                data_file = DataFile.from_args(
+                    content=DataFileContent.DATA,
+                    file_path=f"{tmp_dir}/data{i}.parquet",
+                    file_format=FileFormat.PARQUET,
+                    partition=Record(),
+                    record_count=100 * (i + 1),
+                    file_size_in_bytes=1000 * (i + 1),
+                )
+                writer.add_entry(
+                    ManifestEntry.from_args(
+                        status=ManifestEntryStatus.ADDED,
+                        snapshot_id=i + 1,
+                        data_file=data_file,
+                    )
+                )
+            manifest_files.append(writer.to_manifest_file())
+
+        # Create N manifest lists, each containing an increasing number of 
manifests
+        # list[i] contains manifests[0:i+1]
+        manifest_list_paths = []
+        for i in range(num_manifests):
+            list_path = f"{tmp_dir}/manifest-list{i}.avro"
+            with write_manifest_list(
+                format_version=2,
+                output_file=io.new_output(list_path),
+                snapshot_id=i + 1,
+                parent_snapshot_id=i if i > 0 else None,
+                sequence_number=i + 1,
+                avro_compression="zstandard",
+            ) as list_writer:
+                list_writer.add_manifests(manifest_files[: i + 1])
+            manifest_list_paths.append(list_path)

Review Comment:
   This loop builds manifest lists that include manifests from previous 
snapshots, but the `manifest_files` are created via `writer.to_manifest_file()` 
which leaves `sequence_number`/`min_sequence_number` as `UNASSIGNED_SEQ`. When 
`write_manifest_list` (v2) tries to include an older manifest in a newer 
snapshot’s list, `ManifestListWriterV2.prepare_manifest` will raise because 
`added_snapshot_id` != current `snapshot_id`. Fix by using manifests read from 
the previously-written manifest list (so sequence numbers are assigned) or by 
setting sequence numbers for “existing” manifests before writing subsequent 
manifest lists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to