cbb330 commented on code in PR #2432:
URL: https://github.com/apache/iceberg-python/pull/2432#discussion_r2329377465
##########
tests/io/test_pyarrow.py:
##########
@@ -2811,3 +2854,641 @@ def test_parse_location_defaults() -> None:
assert scheme == "hdfs"
assert netloc == "netloc:8000"
assert path == "/foo/bar"
+
+
+def test_write_and_read_orc(tmp_path):
+ """Test basic ORC write and read functionality."""
+ # Create a simple Arrow table
+ data = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
+ orc_path = tmp_path / 'test.orc'
+ orc.write_table(data, str(orc_path))
+ # Read it back
+ orc_file = orc.ORCFile(str(orc_path))
+ table_read = orc_file.read()
+ assert table_read.equals(data)
+
+
+def test_orc_file_format_integration(tmp_path):
+ """Test ORC file format integration with PyArrow dataset API."""
+ # This test mimics a minimal integration with PyIceberg's FileFormat enum
and pyarrow.orc
+ from pyiceberg.manifest import FileFormat
+ import pyarrow.dataset as ds
+ data = pa.table({'a': [10, 20], 'b': ['foo', 'bar']})
+ orc_path = tmp_path / 'iceberg.orc'
+ orc.write_table(data, str(orc_path))
+ # Use PyArrow dataset API to read as ORC
+ dataset = ds.dataset(str(orc_path), format=ds.OrcFileFormat())
+ table_read = dataset.to_table()
+ assert table_read.equals(data)
+
+
+def test_iceberg_read_orc(tmp_path):
+ """
+ Integration test: Read ORC files via Iceberg API.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k test_iceberg_read_orc
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+ from pyiceberg.schema import Schema, NestedField
+ from pyiceberg.types import IntegerType, StringType
+ from pyiceberg.manifest import FileFormat, DataFileContent
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.io.pyarrow import PyArrowFileIO, ArrowScan
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.expressions import AlwaysTrue
+
+ # Define schema and data
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "name", StringType(), required=False),
+ )
+ data = pa.table({"id": pa.array([1, 2, 3], type=pa.int32()), "name": ["a",
"b", "c"]})
+
+ # Create ORC file directly using PyArrow
+ orc_path = tmp_path / "test_data.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=2,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "write.format.default": "parquet", # This doesn't matter for
reading
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["name"]}]', # Add name mapping for ORC files without
field IDs
+ }
+ )
+ io = PyArrowFileIO()
+
+ # Create a DataFile pointing to the ORC file
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=3,
+ column_sizes={1: 12, 2: 12}, # Approximate sizes
+ value_counts={1: 3, 2: 3},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: b"\x01\x00\x00\x00", 2: b"a"}, # Approximate bounds
+ upper_bounds={1: b"\x03\x00\x00\x00", 2: b"c"},
+ split_offsets=None,
+ )
+ # Ensure spec_id is properly set
+ data_file.spec_id = 0
+
+ # Read back using ArrowScan
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=AlwaysTrue(),
+ case_sensitive=True,
+ )
+ scan_task = FileScanTask(data_file=data_file)
+ table_read = scan.to_table([scan_task])
+
+ # Compare data ignoring schema metadata (like not null constraints)
+ assert table_read.num_rows == data.num_rows
+ assert table_read.num_columns == data.num_columns
+ assert table_read.column_names == data.column_names
+
+ # Compare actual column data values
+ for col_name in data.column_names:
+ assert table_read.column(col_name).to_pylist() ==
data.column(col_name).to_pylist()
+
+
+def test_orc_row_filtering_predicate_pushdown(tmp_path):
+ """
+ Test ORC row filtering and predicate pushdown functionality.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k
test_orc_row_filtering_predicate_pushdown
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+ from pyiceberg.schema import Schema, NestedField
+ from pyiceberg.types import IntegerType, StringType, BooleanType
+ from pyiceberg.manifest import FileFormat, DataFileContent
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.io.pyarrow import PyArrowFileIO, ArrowScan
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.expressions import EqualTo, GreaterThan, LessThan, And, Or,
In
+
+ # Define schema and data with more complex data for filtering
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "name", StringType(), required=False),
+ NestedField(3, "age", IntegerType(), required=True),
+ NestedField(4, "active", BooleanType(), required=True),
+ )
+
+ # Create data with various values for filtering
+ data = pa.table({
+ "id": pa.array([1, 2, 3, 4, 5], type=pa.int32()),
+ "name": ["alice", "bob", "charlie", "david", "eve"],
+ "age": pa.array([25, 30, 35, 40, 45], type=pa.int32()),
+ "active": [True, False, True, True, False]
+ })
+
+ # Create ORC file
+ orc_path = tmp_path / "filter_test.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=4,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["name"]}, {"field-id": 3, "names": ["age"]},
{"field-id": 4, "names": ["active"]}]',
+ }
+ )
+ io = PyArrowFileIO()
+
+ # Create DataFile
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=5,
+ column_sizes={1: 20, 2: 50, 3: 20, 4: 10},
+ value_counts={1: 5, 2: 5, 3: 5, 4: 5},
+ null_value_counts={1: 0, 2: 0, 3: 0, 4: 0},
+ nan_value_counts={1: 0, 2: 0, 3: 0, 4: 0},
+ lower_bounds={1: b"\x01\x00\x00\x00", 2: b"alice", 3:
b"\x19\x00\x00\x00", 4: b"\x00"},
+ upper_bounds={1: b"\x05\x00\x00\x00", 2: b"eve", 3:
b"\x2d\x00\x00\x00", 4: b"\x01"},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+
+ scan_task = FileScanTask(data_file=data_file)
+
+ # Test 1: Simple equality filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=EqualTo("id", 3),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 1
+ assert result.column("id").to_pylist() == [3]
+ assert result.column("name").to_pylist() == ["charlie"]
+
+ # Test 2: Range filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=And(GreaterThan("age", 30), LessThan("age", 45)),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 2
+ assert set(result.column("id").to_pylist()) == {3, 4}
+
+ # Test 3: String filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=EqualTo("name", "bob"),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 1
+ assert result.column("name").to_pylist() == ["bob"]
+
+ # Test 4: Boolean filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=EqualTo("active", True),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 3
+ assert set(result.column("id").to_pylist()) == {1, 3, 4}
+
+ # Test 5: IN filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=In("id", [1, 3, 5]),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 3
+ assert set(result.column("id").to_pylist()) == {1, 3, 5}
+
+ # Test 6: Complex AND/OR filter
+ scan = ArrowScan(
+ table_metadata=table_metadata,
+ io=io,
+ projected_schema=schema,
+ row_filter=Or(
+ And(EqualTo("active", True), GreaterThan("age", 30)),
+ EqualTo("name", "bob")
+ ),
+ case_sensitive=True,
+ )
+ result = scan.to_table([scan_task])
+ assert result.num_rows == 3
+ assert set(result.column("id").to_pylist()) == {2, 3, 4} # bob, charlie,
david
+
+
+def test_orc_record_batching_streaming(tmp_path):
+ """
+ Test ORC record batching and streaming functionality.
+ To run just this test:
+ pytest tests/io/test_pyarrow.py -k test_orc_record_batching_streaming
+ """
+ import pyarrow as pa
+ import pyarrow.orc as orc
+ from pyiceberg.schema import Schema, NestedField
+ from pyiceberg.types import IntegerType, StringType
+ from pyiceberg.manifest import FileFormat, DataFileContent
+ from pyiceberg.table.metadata import TableMetadataV2
+ from pyiceberg.partitioning import PartitionSpec
+ from pyiceberg.io.pyarrow import PyArrowFileIO, ArrowScan
+ from pyiceberg.table import FileScanTask
+ from pyiceberg.expressions import AlwaysTrue
+
+ # Define schema and create larger dataset for batching
+ schema = Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "value", StringType(), required=False),
+ )
+
+ # Create larger dataset to test batching
+ num_rows = 1000
+ data = pa.table({
+ "id": pa.array(range(1, num_rows + 1), type=pa.int32()),
+ "value": [f"value_{i}" for i in range(1, num_rows + 1)]
+ })
+
+ # Create ORC file
+ orc_path = tmp_path / "batch_test.orc"
+ orc.write_table(data, str(orc_path))
+
+ # Create table metadata
+ table_metadata = TableMetadataV2(
+ location=f"file://{tmp_path}/test_location",
+ last_column_id=2,
+ format_version=2,
+ schemas=[schema],
+ partition_specs=[PartitionSpec()],
+ properties={
+ "schema.name-mapping.default": '[{"field-id": 1, "names": ["id"]},
{"field-id": 2, "names": ["value"]}]',
+ }
+ )
+ io = PyArrowFileIO()
+
+ # Create DataFile
+ from pyiceberg.manifest import DataFile
+ from pyiceberg.typedef import Record
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=str(orc_path),
+ file_format=FileFormat.ORC,
+ partition=Record(),
+ file_size_in_bytes=orc_path.stat().st_size,
+ sort_order_id=None,
+ spec_id=0,
+ equality_ids=None,
+ key_metadata=None,
+ record_count=num_rows,
+ column_sizes={1: 4000, 2: 8000},
+ value_counts={1: num_rows, 2: num_rows},
+ null_value_counts={1: 0, 2: 0},
+ nan_value_counts={1: 0, 2: 0},
+ lower_bounds={1: b"\x01\x00\x00\x00", 2: b"value_1"},
+ upper_bounds={1: b"\xe8\x03\x00\x00", 2: b"value_1000"},
+ split_offsets=None,
+ )
+ data_file.spec_id = 0
+
+ scan_task = FileScanTask(data_file=data_file)
+
+ # Test 1: Record batching - verify we get multiple batches
Review Comment:
doesn't seem to test multiple batches. can we verify multiple batches here?
I ask because the number of batches would be relative to number of file scan
tasks+ORC fragments. and hoping that count of batches equals num of files *
expected orc fragment per file as e2e validation
btw maybe ^ deserves its own test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]