mccormickt12 commented on code in PR #2432:
URL: https://github.com/apache/iceberg-python/pull/2432#discussion_r2356034273
##########
tests/table/test_init.py:
##########
@@ -265,8 +265,19 @@ def test_history(table_v2: Table) -> None:
]
-def test_table_scan_select(table_v2: Table) -> None:
- scan = table_v2.scan()
[email protected](
+ "table_fixture",
+ [
+ pytest.param(pytest.lazy_fixture("table_v2"), id="parquet"),
+ pytest.param(pytest.lazy_fixture("table_v2_orc"), id="orc"),
+ ],
+)
+def test_table_scan_select(table_fixture: Table) -> None:
+ import logging
+
+ logger = logging.getLogger(__name__)
+ logger.debug(table_fixture.metadata)
Review Comment:
removed
##########
tests/io/test_pyarrow.py:
##########
@@ -2811,3 +2857,1871 @@ def test_parse_location_defaults() -> None:
assert scheme == "hdfs"
assert netloc == "netloc:8000"
assert path == "/foo/bar"
+
+
+def test_write_and_read_orc(tmp_path: Path) -> None:
+ """Test basic ORC write and read functionality."""
+ # Create a simple Arrow table
+ data = pa.table({"a": [1, 2, 3], "b": ["x", "y", "z"]})
+ orc_path = tmp_path / "test.orc"
+ orc.write_table(data, str(orc_path))
+ # Read it back
+ orc_file = orc.ORCFile(str(orc_path))
+ table_read = orc_file.read()
+ assert table_read.equals(data)
+
+
+def test_orc_file_format_integration(tmp_path: Path) -> None:
+ """Test ORC file format integration with PyArrow dataset API."""
+ # This test mimics a minimal integration with PyIceberg's FileFormat enum
and pyarrow.orc
+ import pyarrow.dataset as ds
+
+ data = pa.table({"a": [10, 20], "b": ["foo", "bar"]})
+ orc_path = tmp_path / "iceberg.orc"
+ orc.write_table(data, str(orc_path))
+ # Use PyArrow dataset API to read as ORC
+ dataset = ds.dataset(str(orc_path), format=ds.OrcFileFormat())
+ table_read = dataset.to_table()
+ assert table_read.equals(data)
+
+
+def test_iceberg_read_orc(tmp_path: Path) -> None:
+ """
+ Integration test: Read ORC files via Iceberg API.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k test_iceberg_read_orc
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+
+ from pyiceberg.expressions import AlwaysTrue
+ from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
+ from pyiceberg.manifest import DataFileContent, FileFormat
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.schema import Schema
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.types import IntegerType, StringType
+
+ # Define schema and data
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "name", StringType(), required=False),
+ )
+ data = pa.table({"id": pa.array([1, 2, 3], type=pa.int32()), "name": ["a",
"b", "c"]})
+
+ # Create ORC file directly using PyArrow
+ orc_path = tmp_path / "test_data.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=2,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "write.format.default": "parquet", # This doesn't matter for
reading
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["name"]}]', # Add name mapping for ORC files without
field IDs
+ },
+ )
+ io = PyArrowFileIO()
+
+ # Create a DataFile pointing to the ORC file
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=3,
+ column_sizes={1: 12, 2: 12}, # Approximate sizes
+ value_counts={1: 3, 2: 3},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: b"\x01\x00\x00\x00", 2: b"a"}, # Approximate bounds
+ upper_bounds={1: b"\x03\x00\x00\x00", 2: b"c"},
+ split_offsets=None,
+ )
+ # Ensure spec_id is properly set
+ data_file.spec_id = 0
+
+ # Read back using ArrowScan
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+ scan_task = FileScanTask(data_file=data_file)
+ table_read = scan.to_table([scan_task])
+
+ # Compare data ignoring schema metadata (like not null constraints)
+ assert table_read.num_rows == data.num_rows
+ assert table_read.num_columns == data.num_columns
+ assert table_read.column_names == data.column_names
+
+ # Compare actual column data values
+ for col_name in data.column_names:
+ assert table_read.column(col_name).to_pylist() ==
data.column(col_name).to_pylist()
+
+
+def test_orc_row_filtering_predicate_pushdown(tmp_path: Path) -> None:
+ """
+ Test ORC row filtering and predicate pushdown functionality.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k
test_orc_row_filtering_predicate_pushdown
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+
+ from pyiceberg.expressions import And, EqualTo, GreaterThan, In, LessThan,
Or
+ from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
+ from pyiceberg.manifest import DataFileContent, FileFormat
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.schema import Schema
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.types import BooleanType, IntegerType, StringType
+
+ # Define schema and data with more complex data for filtering
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "name", StringType(), required=False),
+ NestedField(3, "age", IntegerType(), required=True),
+ NestedField(4, "active", BooleanType(), required=True),
+ )
+
+ # Create data with various values for filtering
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3, 4, 5], type=pa.int32()),
+ "name": ["alice", "bob", "charlie", "david", "eve"],
+ "age": pa.array([25, 30, 35, 40, 45], type=pa.int32()),
+ "active": [True, False, True, True, False],
+ }
+ )
+
+ # Create ORC file
+ orc_path = tmp_path / "filter_test.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=4,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["name"]}, {"field-id": 3, "names": ["age"]},
{"field-id": 4, "names": ["active"]}]',
+ },
+ )
+ io = PyArrowFileIO()
+
+ # Create DataFile
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=5,
+ column_sizes={1: 20, 2: 50, 3: 20, 4: 10},
+ value_counts={1: 5, 2: 5, 3: 5, 4: 5},
+ null_value_counts={1: 0, 2: 0, 3: 0, 4: 0},
+ nan_value_counts={1: 0, 2: 0, 3: 0, 4: 0},
+ lower_bounds={1: b"\x01\x00\x00\x00", 2: b"alice", 3:
b"\x19\x00\x00\x00", 4: b"\x00"},
+ upper_bounds={1: b"\x05\x00\x00\x00", 2: b"eve", 3:
b"\x2d\x00\x00\x00", 4: b"\x01"},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+
+ scan_task = FileScanTask(data_file=data_file)
+
+ # Test 1: Simple equality filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=EqualTo("id", 3),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 1
+ assert result.column("id").to_pylist() == [3]
+ assert result.column("name").to_pylist() == ["charlie"]
+
+ # Test 2: Range filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=And(GreaterThan("age", 30), LessThan("age", 45)),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 2
+ assert set(result.column("id").to_pylist()) == {3, 4}
+
+ # Test 3: String filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=EqualTo("name", "bob"),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 1
+ assert result.column("name").to_pylist() == ["bob"]
+
+ # Test 4: Boolean filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=EqualTo("active", True),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 3
+ assert set(result.column("id").to_pylist()) == {1, 3, 4}
+
+ # Test 5: IN filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=In("id", [1, 3, 5]),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 3
+ assert set(result.column("id").to_pylist()) == {1, 3, 5}
+
+ # Test 6: Complex AND/OR filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=Or(And(EqualTo("active", True), GreaterThan("age", 30)),
EqualTo("name", "bob")),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 3
+ assert set(result.column("id").to_pylist()) == {2, 3, 4} # bob, charlie,
david
+
+
+def test_orc_record_batching_streaming(tmp_path: Path) -> None:
+ """
+ Test ORC record batching and streaming functionality with multiple files
and fragments.
+ This test validates that we get the expected number of batches based on
file scan tasks
+ and ORC fragments, providing end-to-end validation of the batching
behavior.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k test_orc_record_batching_streaming
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+
+ from pyiceberg.expressions import AlwaysTrue
+ from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
+ from pyiceberg.manifest import DataFileContent, FileFormat
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.schema import Schema
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.types import IntegerType, StringType
+
+ # Define schema
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "value", StringType(), required=False),
+ )
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=2,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["value"]}]',
+ },
+ )
+ io = PyArrowFileIO()
+
+ # Test with larger files to better demonstrate batching behavior
+ # PyArrow default batch size is typically 1024 rows, so we'll create files
larger than that
+ num_files = 2
+ rows_per_file = 2000 # Larger than default batch size to ensure multiple
batches per file
+ total_rows = num_files * rows_per_file
+
+ scan_tasks = []
+ for file_idx in range(num_files):
+ # Create data for this file
+ start_id = file_idx * rows_per_file + 1
+ end_id = (file_idx + 1) * rows_per_file
+ data = pa.table(
+ {
+ "id": pa.array(range(start_id, end_id + 1), type=pa.int32()),
+ "value": [f"file_{file_idx}_value_{i}" for i in
range(start_id, end_id + 1)],
+ }
+ )
+
+ # Create ORC file
+ orc_path = tmp_path / f"batch_test_{file_idx}.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create DataFile
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=rows_per_file,
+ column_sizes={1: 8000, 2: 16000},
+ value_counts={1: rows_per_file, 2: rows_per_file},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: start_id.to_bytes(4, "little"), 2:
f"file_{file_idx}_value_{start_id}".encode()},
+ upper_bounds={1: end_id.to_bytes(4, "little"), 2:
f"file_{file_idx}_value_{end_id}".encode()},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+ scan_tasks.append(FileScanTask(data_file=data_file))
+
+ # Test 1: Multiple file batching - verify we get batches from all files
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+
+ batches = list(scan.to_record_batches(scan_tasks))
+
+ # Verify we get the expected number of batches
+ # Based on our testing, PyArrow creates 1 batch per file
+ expected_batches = num_files # 1 batch per file
+ assert len(batches) == expected_batches, f"Expected {expected_batches}
batches (1 per file), got {len(batches)}"
+
+ # Verify batch sizes are reasonable (not too large)
+ max_batch_size = max(batch.num_rows for batch in batches)
+ assert max_batch_size <= 2000, f"Batch size {max_batch_size} seems too
large for ORC files"
+ assert max_batch_size > 0, "Batch should not be empty"
+
+ # We shouldn't get more batches than total rows (one batch per row maximum)
+ assert len(batches) <= total_rows, f"Expected at most {total_rows} batches
(one per row), got {len(batches)}"
+
+ # Verify all batches are RecordBatch objects
+ for batch in batches:
+ assert isinstance(batch, pa.RecordBatch), f"Expected RecordBatch, got
{type(batch)}"
+ assert batch.num_columns == 2, f"Expected 2 columns, got
{batch.num_columns}"
+ assert "id" in batch.schema.names, "Missing 'id' column"
+ assert "value" in batch.schema.names, "Missing 'value' column"
+
+ # Test 2: Verify data integrity across all batches from all files
+ total_rows = sum(batch.num_rows for batch in batches)
+ assert total_rows == total_rows, f"Expected {total_rows} rows total, got
{total_rows}"
+
+ # Collect all data from batches and verify it spans all files
+ all_ids = []
+ all_values = []
+ for batch in batches:
+ all_ids.extend(batch.column("id").to_pylist())
+ all_values.extend(batch.column("value").to_pylist())
+
+ # Verify we have data from all files
+ expected_ids = list(range(1, total_rows + 1))
+ assert sorted(all_ids) == expected_ids, f"ID data doesn't match expected
range 1-{total_rows}"
+
+ # Verify values contain data from all files
+ file_values = set()
+ for value in all_values:
+ if value.startswith("file_"):
+ file_idx = int(value.split("_")[1])
+ file_values.add(file_idx)
+ assert file_values == set(range(num_files)), f"Expected values from all
{num_files} files, got from files: {file_values}"
+
+ # Test 3: Verify batch distribution across files
+ # Each file should contribute at least one batch
+ batch_sizes = [batch.num_rows for batch in batches]
+ total_batch_rows = sum(batch_sizes)
+ assert total_batch_rows == total_rows, f"Total batch rows
{total_batch_rows} != expected {total_rows}"
+
+ # Verify we have reasonable batch sizes (not too small, not too large)
+ for batch_size in batch_sizes:
+ assert batch_size > 0, "Batch should not be empty"
+ assert batch_size <= total_rows, f"Batch size {batch_size} should not
exceed total rows {total_rows}"
+
+ # Test 4: Streaming behavior with multiple files
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+
+ processed_rows = 0
+ batch_count = 0
+ file_data_counts = dict.fromkeys(range(num_files), 0)
+
+ for batch in scan.to_record_batches(scan_tasks):
+ batch_count += 1
+ processed_rows += batch.num_rows
+
+ # Count rows per file in this batch
+ for value in batch.column("value").to_pylist():
+ if value.startswith("file_"):
+ file_idx = int(value.split("_")[1])
+ file_data_counts[file_idx] += 1
+
+ # PyArrow may optimize batching, so we just verify we get reasonable
batching
+ assert batch_count >= 1, f"Expected at least 1 batch, got
{batch_count}"
+ assert batch_count <= num_files, f"Expected at most {num_files}
batches (1 per file), got {batch_count}"
+ assert processed_rows == total_rows, f"Processed {processed_rows} rows,
expected {total_rows}"
+
+ # Verify each file contributed data
+ for file_idx in range(num_files):
+ assert file_data_counts[file_idx] == rows_per_file, (
+ f"File {file_idx} contributed {file_data_counts[file_idx]} rows,
expected {rows_per_file}"
+ )
+
+ # Test 5: Column projection with multiple files
+ projected_schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ )
+
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=projected_schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+
+ batches = list(scan.to_record_batches(scan_tasks))
+ assert len(batches) >= 1, f"Expected at least 1 batch for projected
schema, got {len(batches)}"
+
+ for batch in batches:
+ assert batch.num_columns == 1, f"Expected 1 column after projection,
got {batch.num_columns}"
+ assert "id" in batch.schema.names, "Missing 'id' column after
projection"
+ assert "value" not in batch.schema.names, "Should not have 'value'
column after projection"
+
+ # Test 6: Filtering with multiple files
+ from pyiceberg.expressions import GreaterThan
+
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=GreaterThan("id", total_rows // 2), # Filter to second
half of data
+ case_sensitive=True,
+ )
+
+ batches = list(scan.to_record_batches(scan_tasks))
+ total_filtered_rows = sum(batch.num_rows for batch in batches)
+ expected_filtered = total_rows // 2
+ assert total_filtered_rows == expected_filtered, (
+ f"Expected {expected_filtered} rows after filtering, got
{total_filtered_rows}"
+ )
+
+ # Verify all returned IDs are in the filtered range
+ for batch in batches:
+ ids = batch.column("id").to_pylist()
+ assert all(id_val > total_rows // 2 for id_val in ids), f"Found ID <=
{total_rows // 2}: {ids}"
+
+ # Test 7: Verify batch count matches expected pattern
+ # The number of batches should be >= number of files (one batch per file
minimum)
+ # and could be more if ORC creates multiple fragments per file
+ print(f"Generated {len(batches)} batches from {num_files} files with
{total_rows} total rows")
+ print(f"Batch sizes: {[batch.num_rows for batch in batches]}")
+ print(f"Average batch size: {total_rows / len(batches):.1f} rows per
batch")
+
+ # This validates the end-to-end batching behavior as requested in the PR
comment
+ # We expect multiple batches based on file size and configured batch size
+ print(f"Expected at least {num_files} batches (1 per file) with PyArrow's
default batching")
+ print(f"Actual batch count: {len(batches)}")
+ print(f"Batch sizes: {[batch.num_rows for batch in batches]}")
+
+ # Verify we get reasonable batching behavior
+ assert len(batches) >= 1, f"Expected at least 1 batch, got {len(batches)}"
+ assert len(batches) <= total_rows, f"Expected at most {total_rows} batches
(one per row), got {len(batches)}"
+
+
+def test_orc_batching_exact_counts_single_file(tmp_path: Path) -> None:
+ """
+ Test exact batch counts for single ORC files of different sizes.
+ This test explicitly verifies the number of batches PyArrow creates for
different file sizes.
+ Note: Uses default ORC writing (1 stripe per file), so expects 1 batch per
file.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k
test_orc_batching_exact_counts_single_file
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+
+ from pyiceberg.expressions import AlwaysTrue
+ from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
+ from pyiceberg.manifest import DataFileContent, FileFormat
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.schema import Schema
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.types import IntegerType, StringType
+
+ # Define schema
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "value", StringType(), required=False),
+ )
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=2,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["value"]}]',
+ },
+ )
+ io = PyArrowFileIO()
+
+ # Test different file sizes to understand PyArrow's batching behavior
+ # Note: All files will have 1 stripe (default ORC writing), so 1 batch each
+ test_cases = [
+ (500, "Small file (1 stripe)"),
+ (1000, "Medium file (1 stripe)"),
+ (2000, "Large file (1 stripe)"),
+ (5000, "Very large file (1 stripe)"),
+ ]
+
+ for num_rows, description in test_cases:
+ print(f"\n=== Testing {description} with {num_rows} rows ===")
+
+ # Create data
+ data = pa.table(
+ {"id": pa.array(range(1, num_rows + 1), type=pa.int32()), "value":
[f"value_{i}" for i in range(1, num_rows + 1)]}
+ )
+
+ # Create ORC file
+ orc_path = tmp_path / f"test_{num_rows}_rows.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create DataFile
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=num_rows,
+ column_sizes={1: num_rows * 4, 2: num_rows * 8},
+ value_counts={1: num_rows, 2: num_rows},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: b"\x01\x00\x00\x00", 2: b"value_1"},
+ upper_bounds={1: num_rows.to_bytes(4, "little"), 2:
f"value_{num_rows}".encode()},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+ scan_task = FileScanTask(data_file=data_file)
+
+ # Test batching behavior
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+
+ batches = list(scan.to_record_batches([scan_task]))
+
+ print(f" Rows: {num_rows}")
+ print(f" Batches: {len(batches)}")
+ print(f" Batch sizes: {[batch.num_rows for batch in batches]}")
+
+ # Verify exact batch count and sizes
+ total_batch_rows = sum(batch.num_rows for batch in batches)
+ assert total_batch_rows == num_rows, f"Total rows mismatch: expected
{num_rows}, got {total_batch_rows}"
+
+ # Verify data integrity
+ all_ids = []
+ for batch in batches:
+ all_ids.extend(batch.column("id").to_pylist())
+ assert sorted(all_ids) == list(range(1, num_rows + 1)), f"Data
integrity check failed for {num_rows} rows"
+
+ print(f" ✓ {description}: {len(batches)} batches, {total_batch_rows}
total rows")
+
+
+def test_orc_batching_exact_counts_multiple_files(tmp_path: Path) -> None:
+ """
+ Test exact batch counts for multiple ORC files of different sizes and
counts.
+ This test explicitly verifies the number of batches PyArrow creates for
different file configurations.
+ Note: Uses default ORC writing (1 stripe per file), so expects 1 batch per
file.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k
test_orc_batching_exact_counts_multiple_files
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+
+ from pyiceberg.expressions import AlwaysTrue
+ from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
+ from pyiceberg.manifest import DataFileContent, FileFormat
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.schema import Schema
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.types import IntegerType, StringType
+
+ # Define schema
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "value", StringType(), required=False),
+ )
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=2,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["value"]}]',
+ },
+ )
+ io = PyArrowFileIO()
+
+ # Test different file configurations to understand PyArrow's batching
behavior
+ # Note: All files will have 1 stripe each (default ORC writing), so 1
batch per file
+ test_cases = [
+ (2, 500, "2 files, 500 rows each (1 stripe each)"),
+ (3, 1000, "3 files, 1000 rows each (1 stripe each)"),
+ (4, 750, "4 files, 750 rows each (1 stripe each)"),
+ (2, 2000, "2 files, 2000 rows each (1 stripe each)"),
+ ]
+
+ for num_files, rows_per_file, description in test_cases:
+ print(f"\n=== Testing {description} ===")
+
+ total_rows = num_files * rows_per_file
+ scan_tasks = []
+
+ for file_idx in range(num_files):
+ # Create data for this file
+ start_id = file_idx * rows_per_file + 1
+ end_id = (file_idx + 1) * rows_per_file
+ data = pa.table(
+ {
+ "id": pa.array(range(start_id, end_id + 1),
type=pa.int32()),
+ "value": [f"file_{file_idx}_value_{i}" for i in
range(start_id, end_id + 1)],
+ }
+ )
+
+ # Create ORC file
+ orc_path = tmp_path /
f"multi_test_{file_idx}_{rows_per_file}_rows.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create DataFile
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=rows_per_file,
+ column_sizes={1: rows_per_file * 4, 2: rows_per_file * 8},
+ value_counts={1: rows_per_file, 2: rows_per_file},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: start_id.to_bytes(4, "little"), 2:
f"file_{file_idx}_value_{start_id}".encode()},
+ upper_bounds={1: end_id.to_bytes(4, "little"), 2:
f"file_{file_idx}_value_{end_id}".encode()},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+ scan_tasks.append(FileScanTask(data_file=data_file))
+
+ # Test batching behavior across multiple files
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+
+ batches = list(scan.to_record_batches(scan_tasks))
+
+ print(f" Files: {num_files}")
+ print(f" Rows per file: {rows_per_file}")
+ print(f" Total rows: {total_rows}")
+ print(f" Batches: {len(batches)}")
+ print(f" Batch sizes: {[batch.num_rows for batch in batches]}")
+
+ # Verify exact batch count and sizes
+ total_batch_rows = sum(batch.num_rows for batch in batches)
+ assert total_batch_rows == total_rows, f"Total rows mismatch: expected
{total_rows}, got {total_batch_rows}"
+
+ # Verify data spans all files
+ all_ids = []
+ file_data_counts = dict.fromkeys(range(num_files), 0)
+
+ for batch in batches:
+ batch_ids = batch.column("id").to_pylist()
+ all_ids.extend(batch_ids)
+
+ # Count rows per file in this batch
+ for value in batch.column("value").to_pylist():
+ if value.startswith("file_"):
+ file_idx = int(value.split("_")[1])
+ file_data_counts[file_idx] += 1
+
+ # Verify we have data from all files
+ assert sorted(all_ids) == list(range(1, total_rows + 1)), f"Data
integrity check failed for {description}"
+
+ # Verify each file contributed data
+ for file_idx in range(num_files):
+ assert file_data_counts[file_idx] == rows_per_file, (
+ f"File {file_idx} contributed {file_data_counts[file_idx]}
rows, expected {rows_per_file}"
+ )
+
+ print(f" ✓ {description}: {len(batches)} batches, {total_batch_rows}
total rows")
+
+
+def test_orc_field_id_extraction() -> None:
+ """
+ Test ORC field ID extraction from PyArrow field metadata.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k test_orc_field_id_extraction
+ """
+ import pyarrow as pa
+
+ from pyiceberg.io.pyarrow import ORC_FIELD_ID_KEY,
PYARROW_PARQUET_FIELD_ID_KEY, _get_field_id
+
+ # Test 1: Parquet field ID extraction
+ field_parquet = pa.field("test_parquet", pa.string(),
metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"123"})
+ field_id = _get_field_id(field_parquet)
+ assert field_id == 123, f"Expected Parquet field ID 123, got {field_id}"
+
+ # Test 2: ORC field ID extraction
+ field_orc = pa.field("test_orc", pa.string(), metadata={ORC_FIELD_ID_KEY:
b"456"})
+ field_id = _get_field_id(field_orc)
+ assert field_id == 456, f"Expected ORC field ID 456, got {field_id}"
+
+ # Test 3: No field ID
+ field_no_id = pa.field("test_no_id", pa.string())
+ field_id = _get_field_id(field_no_id)
+ assert field_id is None, f"Expected None for field without ID, got
{field_id}"
+
+ # Test 4: Both field IDs present (should prefer Parquet)
+ field_both = pa.field("test_both", pa.string(),
metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"123", ORC_FIELD_ID_KEY: b"456"})
+ field_id = _get_field_id(field_both)
+ assert field_id == 123, f"Expected Parquet field ID 123 (preferred), got
{field_id}"
+
+ # Test 5: Empty metadata
+ field_empty_metadata = pa.field("test_empty", pa.string(), metadata={})
+ field_id = _get_field_id(field_empty_metadata)
+ assert field_id is None, f"Expected None for field with empty metadata,
got {field_id}"
+
+ # Test 6: Invalid field ID format
+ field_invalid = pa.field("test_invalid", pa.string(),
metadata={ORC_FIELD_ID_KEY: b"not_a_number"})
+ try:
+ field_id = _get_field_id(field_invalid)
+ raise AssertionError("Expected ValueError for invalid field ID format")
+ except ValueError:
+ pass # Expected behavior
+
+ # Test 7: Different data types
+ field_int = pa.field("test_int", pa.int32(), metadata={ORC_FIELD_ID_KEY:
b"789"})
+ field_id = _get_field_id(field_int)
+ assert field_id == 789, f"Expected ORC field ID 789 for int field, got
{field_id}"
+
+ field_bool = pa.field("test_bool", pa.bool_(), metadata={ORC_FIELD_ID_KEY:
b"101"})
+ field_id = _get_field_id(field_bool)
+ assert field_id == 101, f"Expected ORC field ID 101 for bool field, got
{field_id}"
+
+
+def test_orc_schema_with_field_ids(tmp_path: Path) -> None:
+ """
+ Test ORC reading with actual field IDs embedded in the schema.
+ This test creates an ORC file with field IDs and reads it without name
mapping.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k test_orc_schema_with_field_ids
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+
+ from pyiceberg.expressions import AlwaysTrue
+ from pyiceberg.io.pyarrow import ORC_FIELD_ID_KEY, ArrowScan, PyArrowFileIO
+ from pyiceberg.manifest import DataFileContent, FileFormat
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.schema import Schema
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.types import IntegerType, StringType
+
+ # Define schema
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "name", StringType(), required=False),
+ )
+
+ # Create PyArrow schema with ORC field IDs
+ arrow_schema = pa.schema(
+ [
+ pa.field("id", pa.int32(), metadata={ORC_FIELD_ID_KEY: b"1"}),
+ pa.field("name", pa.string(), metadata={ORC_FIELD_ID_KEY: b"2"}),
+ ]
+ )
+
+ # Create data with the schema that has field IDs
+ data = pa.table({"id": pa.array([1, 2, 3], type=pa.int32()), "name":
["alice", "bob", "charlie"]}, schema=arrow_schema)
+
+ # Create ORC file
+ orc_path = tmp_path / "field_id_test.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create table metadata WITHOUT name mapping (should work with field IDs)
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=2,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ # No name mapping - should work with field IDs
+ },
+ )
+ io = PyArrowFileIO()
+
+ # Create DataFile
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=3,
+ column_sizes={1: 12, 2: 30},
+ value_counts={1: 3, 2: 3},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: b"\x01\x00\x00\x00", 2: b"alice"},
+ upper_bounds={1: b"\x03\x00\x00\x00", 2: b"charlie"},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+
+ # Read back using ArrowScan - should work without name mapping
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+ scan_task = FileScanTask(data_file=data_file)
+ table_read = scan.to_table([scan_task])
+
+ # Verify the data was read correctly
+ assert table_read.num_rows == 3
+ assert table_read.num_columns == 2
+ assert table_read.column_names == ["id", "name"]
+
+ # Verify data matches
+ assert table_read.column("id").to_pylist() == [1, 2, 3]
+ assert table_read.column("name").to_pylist() == ["alice", "bob", "charlie"]
+
+
+def test_orc_schema_conversion_with_field_ids() -> None:
+ """
+ Test that schema_to_pyarrow correctly adds ORC field IDs when file_format
is specified.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k
test_orc_schema_conversion_with_field_ids
+ """
+ from pyiceberg.io.pyarrow import ORC_FIELD_ID_KEY,
PYARROW_PARQUET_FIELD_ID_KEY, schema_to_pyarrow
+ from pyiceberg.manifest import FileFormat
+ from pyiceberg.schema import Schema
+ from pyiceberg.types import IntegerType, StringType
+
+ # Define schema
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "name", StringType(), required=False),
+ )
+
+ # Test 1: Default behavior (should add Parquet field IDs)
+ arrow_schema_default = schema_to_pyarrow(schema, include_field_ids=True)
+
+ id_field = arrow_schema_default.field(0) # id field
+ name_field = arrow_schema_default.field(1) # name field
+
+ assert id_field.metadata[PYARROW_PARQUET_FIELD_ID_KEY] == b"1"
+ assert name_field.metadata[PYARROW_PARQUET_FIELD_ID_KEY] == b"2"
+ assert ORC_FIELD_ID_KEY not in id_field.metadata
+ assert ORC_FIELD_ID_KEY not in name_field.metadata
+
+ # Test 2: Explicitly specify ORC format
+ arrow_schema_orc = schema_to_pyarrow(schema, include_field_ids=True,
file_format=FileFormat.ORC)
+
+ id_field_orc = arrow_schema_orc.field(0) # id field
+ name_field_orc = arrow_schema_orc.field(1) # name field
+
+ assert id_field_orc.metadata[ORC_FIELD_ID_KEY] == b"1"
+ assert name_field_orc.metadata[ORC_FIELD_ID_KEY] == b"2"
+ assert PYARROW_PARQUET_FIELD_ID_KEY not in id_field_orc.metadata
+ assert PYARROW_PARQUET_FIELD_ID_KEY not in name_field_orc.metadata
+
+ # Test 3: No field IDs
+ arrow_schema_no_ids = schema_to_pyarrow(schema, include_field_ids=False,
file_format=FileFormat.ORC)
+
+ id_field_no_ids = arrow_schema_no_ids.field(0)
+ name_field_no_ids = arrow_schema_no_ids.field(1)
+
+ assert not id_field_no_ids.metadata
+ assert not name_field_no_ids.metadata
+
+
+def test_orc_batching_behavior_documentation(tmp_path: Path) -> None:
+ """
+ Document and verify PyArrow's exact batching behavior for ORC files.
+ This test serves as comprehensive documentation of how PyArrow batches ORC
files.
+
+ ORC BATCHING BEHAVIOR SUMMARY:
+ =============================
+
+ 1. STRIPE-BASED BATCHING:
+ - PyArrow creates exactly 1 batch per ORC stripe
+ - This is similar to how Parquet creates 1 batch per row group
+ - Number of batches = Number of stripes in the ORC file
+
+ 2. DEFAULT BEHAVIOR:
+ - Default ORC writing creates 1 stripe per file (64MB default stripe
size)
+ - Therefore, most ORC files have 1 batch per file by default
+ - This is why many tests show "1 batch per file" behavior
+
+ 3. CONFIGURABLE BATCHING:
+ - ORC CAN have multiple batches per file when configured with multiple
stripes
+ - Use stripe_size parameter when writing ORC files to control batching
+ - stripe_size < 200KB: PyArrow ignores the parameter, uses default 1024
rows per stripe
+ - stripe_size >= 200KB: PyArrow respects the parameter and creates
stripes accordingly
+
+ 4. PYARROW CONFIGURATION:
+ - PyIceberg sets buffer_size=8MB for both Parquet and ORC
+ - Parquet: Gets the 8MB buffer_size parameter (ds.ParquetFileFormat
supports it)
+ - ORC: Ignores the 8MB buffer_size parameter (ds.OrcFileFormat doesn't
support it)
+ - This means ORC uses PyArrow's default batching behavior (based on
stripes)
+
+ 5. KEY DIFFERENCES FROM PARQUET:
+ - Parquet: 1 FileScanTask → 1 File → 1 PyArrow Fragment → N Batches
(based on row groups)
+ - ORC: 1 FileScanTask → 1 File → 1 PyArrow Fragment → N Batches (based
on stripes)
+ - Both formats support multiple batches per file when configured
properly
+ - The difference is in default configuration, not fundamental behavior
+
+ 6. TESTING IMPLICATIONS:
+ - Tests using default ORC writing will show 1 batch per file
+ - Tests using custom stripe_size >= 200KB will show multiple batches
per file
+ - Always verify the actual number of stripes in ORC files when testing
batching
+
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k
test_orc_batching_behavior_documentation
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+
+ from pyiceberg.expressions import AlwaysTrue
+ from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
+ from pyiceberg.manifest import DataFileContent, FileFormat
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.schema import Schema
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.types import IntegerType, StringType
+
+ # Define schema
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "value", StringType(), required=False),
+ )
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=2,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["value"]}]',
+ },
+ )
+ io = PyArrowFileIO()
+
+ print("\n=== PyArrow ORC Batching Behavior Documentation ===")
+ print("Based on testing, PyArrow's batching behavior for ORC files is:")
+ print("1. Single file: Creates 1 batch per stripe (not per file)")
+ print("2. Multiple files: Creates 1 batch per stripe per file")
+ print("3. Batch size: Each batch contains rows from one stripe")
+ print("4. Stripe-based batching: PyArrow creates multiple batches when ORC
has multiple stripes")
+ print("5. Default behavior: Most ORC files have 1 stripe, so 1 batch per
file")
+ print()
+
+ # Test cases that document the exact behavior (using default ORC writing =
1 stripe per file)
+ test_cases = [
+ # (file_count, rows_per_file, expected_batches, description)
+ (1, 100, 1, "Single small file (1 stripe)"),
+ (1, 1000, 1, "Single medium file (1 stripe)"),
+ (1, 5000, 1, "Single large file (1 stripe)"),
+ (2, 500, 2, "Two small files (1 stripe each)"),
+ (3, 1000, 3, "Three medium files (1 stripe each)"),
+ (4, 750, 4, "Four small files (1 stripe each)"),
+ (2, 2000, 2, "Two large files (1 stripe each)"),
+ ]
+
+ for file_count, rows_per_file, expected_batches, description in test_cases:
+ print(f"Testing: {description} ({file_count} files × {rows_per_file}
rows)")
+
+ total_rows = file_count * rows_per_file
+ scan_tasks = []
+
+ for file_idx in range(file_count):
+ # Create data for this file
+ start_id = file_idx * rows_per_file + 1
+ end_id = (file_idx + 1) * rows_per_file
+ data = pa.table(
+ {
+ "id": pa.array(range(start_id, end_id + 1),
type=pa.int32()),
+ "value": [f"file_{file_idx}_value_{i}" for i in
range(start_id, end_id + 1)],
+ }
+ )
+
+ # Create ORC file
+ orc_path = tmp_path /
f"doc_test_{file_idx}_{rows_per_file}_rows.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create DataFile
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=rows_per_file,
+ column_sizes={1: rows_per_file * 4, 2: rows_per_file * 8},
+ value_counts={1: rows_per_file, 2: rows_per_file},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: start_id.to_bytes(4, "little"), 2:
f"file_{file_idx}_value_{start_id}".encode()},
+ upper_bounds={1: end_id.to_bytes(4, "little"), 2:
f"file_{file_idx}_value_{end_id}".encode()},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+ scan_tasks.append(FileScanTask(data_file=data_file))
+
+ # Test batching behavior
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+
+ batches = list(scan.to_record_batches(scan_tasks))
+
+ # Verify exact batch count
+ assert len(batches) == expected_batches, f"Expected {expected_batches}
batches, got {len(batches)} for {description}"
+
+ # Verify total rows
+ total_batch_rows = sum(batch.num_rows for batch in batches)
+ assert total_batch_rows == total_rows, f"Total rows mismatch: expected
{total_rows}, got {total_batch_rows}"
+
+ # Verify data integrity
+ all_ids = []
+ for batch in batches:
+ all_ids.extend(batch.column("id").to_pylist())
+ assert sorted(all_ids) == list(range(1, total_rows + 1)), f"Data
integrity check failed for {description}"
+
+ print(f" ✓ {description}: {len(batches)} batches, {total_batch_rows}
total rows")
+
+ print("\n=== Summary ===")
+ print("PyArrow ORC batching behavior is simple and predictable:")
+ print("- 1 batch per file, regardless of file size")
+ print("- No internal file splitting")
+ print("- Batch size = file size (number of rows in the file)")
+ print("- Total batches = number of files")
+ print("- This behavior is consistent across all file sizes tested
(100-5000 rows)")
+
+
+def test_parquet_vs_orc_batching_behavior_comparison(tmp_path: Path) -> None:
+ """
+ Compare Parquet vs ORC batching behavior to document the key differences.
+
+ Key differences:
+ - PARQUET: 1 FileScanTask → 1 File → 1 PyArrow Fragment → N Batches (based
on row groups)
+ - ORC: 1 FileScanTask → 1 File → 1 PyArrow Fragment → N Batches (based on
stripes)
+
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k
test_parquet_vs_orc_batching_behavior_comparison
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+ import pyarrow.parquet as pq
+
+ from pyiceberg.expressions import AlwaysTrue
+ from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
+ from pyiceberg.manifest import DataFileContent, FileFormat
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.schema import Schema
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.types import IntegerType, StringType
+
+ # Define schema
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "value", StringType(), required=False),
+ )
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=2,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["value"]}]',
+ },
+ )
+ io = PyArrowFileIO()
+
+ print("\n=== PyArrow Batching Behavior Comparison ===")
+ print("PARQUET: 1 FileScanTask → 1 File → 1 PyArrow Fragment → N Batches
(based on row groups)")
+ print("ORC: 1 FileScanTask → 1 File → 1 PyArrow Fragment → N Batches
(based on stripes)")
+ print()
+
+ # Test Parquet with different row group sizes
+ print("Testing PARQUET batching with different row group sizes:")
+ parquet_test_cases = [
+ (1000, "Small row groups"),
+ (2000, "Medium row groups"),
+ (5000, "Large row groups"),
+ ]
+
+ for row_group_size, description in parquet_test_cases:
+ # Create data
+ data = pa.table({"id": pa.array(range(1, 10001), type=pa.int32()),
"value": [f"value_{i}" for i in range(1, 10001)]})
+
+ # Create Parquet file with specific row group size
+ parquet_path = tmp_path / f"parquet_test_{row_group_size}.parquet"
+ pq.write_table(data, str(parquet_path), row_group_size=row_group_size)
+
+ # Create DataFile
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(parquet_path),
+ file_format=FileFormat.PARQUET,
+ partition=Record(),
+ file_size_in_bytes=parquet_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=10000,
+ column_sizes={1: 40000, 2: 80000},
+ value_counts={1: 10000, 2: 10000},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: b"\x01\x00\x00\x00", 2: b"value_1"},
+ upper_bounds={1: b"\x10'\x00\x00", 2: b"value_10000"},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+ scan_task = FileScanTask(data_file=data_file)
+
+ # Test batching behavior
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+
+ batches = list(scan.to_record_batches([scan_task]))
+ expected_batches = 10000 // row_group_size # Number of row groups
+
+ print(f" {description} (row_group_size={row_group_size}):
{len(batches)} batches, sizes={[b.num_rows for b in batches]}")
+
+ # Verify exact batch count based on row groups
+ assert len(batches) == expected_batches, (
+ f"Expected {expected_batches} batches for
row_group_size={row_group_size}, got {len(batches)}"
+ )
+
+ # Verify total rows
+ total_rows = sum(batch.num_rows for batch in batches)
+ assert total_rows == 10000, f"Expected 10000 total rows, got
{total_rows}"
+
+ print()
+ print("Testing ORC batching with different file sizes:")
+ orc_test_cases = [
+ (1000, "Small file"),
+ (5000, "Medium file"),
+ (10000, "Large file"),
+ ]
+
+ for file_size, description in orc_test_cases:
+ # Create data
+ data = pa.table(
+ {"id": pa.array(range(1, file_size + 1), type=pa.int32()),
"value": [f"value_{i}" for i in range(1, file_size + 1)]}
+ )
+
+ # Create ORC file
+ orc_path = tmp_path / f"orc_test_{file_size}.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create DataFile
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=file_size,
+ column_sizes={1: file_size * 4, 2: file_size * 8},
+ value_counts={1: file_size, 2: file_size},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: b"\x01\x00\x00\x00", 2: b"value_1"},
+ upper_bounds={1: file_size.to_bytes(4, "little"), 2:
f"value_{file_size}".encode()},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+ scan_task = FileScanTask(data_file=data_file)
+
+ # Test batching behavior
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+
+ batches = list(scan.to_record_batches([scan_task]))
+
+ print(f" {description} (file_size={file_size}): {len(batches)}
batches, sizes={[b.num_rows for b in batches]}")
+
+ # Verify ORC creates 1 batch per file (with default stripe
configuration)
+ # Note: This is because default ORC writing creates 1 stripe per file
+ assert len(batches) == 1, (
+ f"Expected 1 batch for ORC file with {file_size} rows (default
stripe config), got {len(batches)}"
+ )
+
+ # Verify total rows
+ total_rows = sum(batch.num_rows for batch in batches)
+ assert total_rows == file_size, f"Expected {file_size} total rows, got
{total_rows}"
+
+ print()
+ print("=== Summary ===")
+ print("PARQUET batching is based on ROW GROUPS:")
+ print("- Number of batches = Number of row groups in the Parquet file")
+ print("- Row group size is configurable when writing Parquet files")
+ print("- PyArrow's scanner creates one batch per row group")
+ print()
+ print("ORC batching is based on STRIPES (like Parquet row groups):")
+ print("- Number of batches = Number of stripes in the ORC file")
+ print("- ORC files with multiple stripes create multiple batches")
+ print("- ORC files with one stripe create one batch")
+ print("- PyArrow's scanner creates one batch per ORC stripe")
+ print()
+ print("This explains why:")
+ print("- Parquet can have multiple batches per file (based on row groups)")
+ print("- ORC can have multiple batches per file (based on stripes)")
+ print("- Both formats use similar batching strategies when configured
properly")
+ print("- The difference is in default configuration, not fundamental
behavior")
+ print()
+ print("IMPORTANT: PyIceberg Configuration Details:")
+ print("- PyIceberg sets buffer_size=8MB for both Parquet and ORC")
+ print("- Parquet: Gets the 8MB buffer_size parameter (ds.ParquetFileFormat
supports it)")
+ print("- ORC: Ignores the 8MB buffer_size parameter (ds.OrcFileFormat
doesn't support it)")
+ print("- This means ORC uses PyArrow's default batching behavior (1 batch
per file)")
+ print("- Parquet uses the configured buffer size, but batching is still
based on row groups")
+ print()
+ print("CORRECTED UNDERSTANDING:")
+ print("- Both Parquet and ORC support multiple batches per file")
+ print("- Parquet: Based on row groups (configurable when writing)")
+ print("- ORC: Based on stripes (configurable when writing)")
+ print("- The key difference is default configuration, not fundamental
capability")
+
+
+def test_orc_stripe_size_batch_size_compression_interaction(tmp_path: Path) ->
None:
+ """
+ Test that demonstrates how stripe size, batch size, and compression
interact
+ to affect ORC batching behavior.
+
+ This test shows:
+ 1. How stripe_size affects the number of stripes (and therefore batches)
+ 2. How batch_size affects the number of stripes when stripe_size is small
+ 3. How compression affects both stripe count and file size
+ 4. The relationship between uncompressed target size and actual file size
+
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k
test_orc_stripe_size_batch_size_compression_interaction
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+
+ from pyiceberg.expressions import AlwaysTrue
+ from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
+ from pyiceberg.manifest import DataFileContent, FileFormat
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.schema import Schema
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.types import IntegerType, StringType
+
+ # Define schema
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "value", StringType(), required=False),
+ )
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=2,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["value"]}]',
+ },
+ )
+ io = PyArrowFileIO()
+
+ print("\n=== ORC Stripe Size, Batch Size, and Compression Interaction Test
===")
+ print("Demonstrating how these parameters affect ORC batching behavior")
+ print()
+
+ # Create test data
+ data = pa.table({"id": pa.array(range(1, 10001), type=pa.int32()),
"value": [f"value_{i}" for i in range(1, 10001)]})
+
+ print("Test data analysis:")
+ raw_size = len(data) * 15 # 4 bytes (int32) + 11 bytes (string) per row
+ print(f" Total rows: {len(data)}")
+ print(f" Raw data size: {raw_size:,} bytes ({raw_size / 1024:.1f}KB)")
+ print(f" Estimated bytes per row (raw): {raw_size / len(data):.1f} bytes")
+ print()
+
+ # Test different combinations
+ test_cases = [
+ # (stripe_size, batch_size, compression, description)
+ (200000, None, "uncompressed", "200KB stripe, no batch limit,
uncompressed"),
+ (200000, 1000, "uncompressed", "200KB stripe, 1000 batch,
uncompressed"),
+ (200000, None, "snappy", "200KB stripe, no batch limit, snappy"),
+ (100000, None, "uncompressed", "100KB stripe, no batch limit,
uncompressed"),
+ (500000, None, "uncompressed", "500KB stripe, no batch limit,
uncompressed"),
+ (None, 1000, "uncompressed", "No stripe limit, 1000 batch,
uncompressed"),
+ (None, 2000, "uncompressed", "No stripe limit, 2000 batch,
uncompressed"),
+ ]
+
+ for stripe_size, batch_size, compression, description in test_cases:
+ print(f"Testing: {description}")
+
+ # Create ORC file with specific parameters
+ orc_path = tmp_path / f"orc_test_{hash(description)}.orc"
+
+ write_kwargs: dict[str, Any] = {"compression": compression}
+ if stripe_size is not None:
+ write_kwargs["stripe_size"] = stripe_size
+ if batch_size is not None:
+ write_kwargs["batch_size"] = batch_size
+
+ orc.write_table(data, str(orc_path), **write_kwargs)
+
+ # Analyze the ORC file
+ file_size = orc_path.stat().st_size
+ orc_file = orc.ORCFile(str(orc_path))
+ actual_stripes = orc_file.nstripes
+ stripe_sizes_rows = [orc_file.read_stripe(i).num_rows for i in
range(actual_stripes)]
+
+ print(f" File size: {file_size:,} bytes ({file_size / 1024:.1f}KB)")
+ print(f" Stripes: {actual_stripes}")
+ print(f" Rows per stripe: {stripe_sizes_rows}")
+
+ if stripe_size:
+ target_bytes_per_row = stripe_size / stripe_sizes_rows[0] if
stripe_sizes_rows else 0
+ print(f" Target bytes per row (uncompressed):
{target_bytes_per_row:.1f} bytes")
+
+ actual_bytes_per_row = file_size / len(data)
+ compression_ratio = raw_size / file_size
+ print(f" Actual bytes per row: {actual_bytes_per_row:.1f} bytes")
+ print(f" Compression ratio: {compression_ratio:.1f}x")
+
+ # Test PyArrow batching
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=file_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=10000,
+ column_sizes={1: 40000, 2: 80000},
+ value_counts={1: 10000, 2: 10000},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: b"\x01\x00\x00\x00", 2: b"value_1"},
+ upper_bounds={1: b"\x10'\x00\x00", 2: b"value_10000"},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+ scan_task = FileScanTask(data_file=data_file)
+
+ # Test batching behavior
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+
+ batches = list(scan.to_record_batches([scan_task]))
+ batch_sizes = [batch.num_rows for batch in batches]
+
+ print(f" PyArrow batches: {len(batches)}")
+ print(f" Batch sizes: {batch_sizes}")
+ print(f" Batches match stripes: {len(batches) == actual_stripes}")
+ print()
+
+ print("=== Key Insights ===")
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]