rdblue commented on code in PR #6437:
URL: https://github.com/apache/iceberg/pull/6437#discussion_r1051661123


##########
python/tests/io/test_pyarrow.py:
##########
@@ -572,3 +581,388 @@ def test_always_true_to_pyarrow(bound_reference: 
BoundReference[str]) -> None:
 
 def test_always_false_to_pyarrow(bound_reference: BoundReference[str]) -> None:
     assert repr(expression_to_pyarrow(AlwaysFalse())) == 
"<pyarrow.compute.Expression false>"
+
+
+@pytest.fixture
+def schema_int() -> Schema:
+    return Schema(NestedField(1, "id", IntegerType()), schema_id=1)
+
+
+@pytest.fixture
+def schema_str() -> Schema:
+    return Schema(NestedField(2, "data", IntegerType()), schema_id=1)
+
+
+@pytest.fixture
+def schema_long() -> Schema:
+    return Schema(NestedField(3, "id", LongType()), schema_id=1)
+
+
+@pytest.fixture
+def table_int(schema_int: Schema, tmpdir: str) -> str:
+    pyarrow_schema = pa.schema(schema_to_pyarrow(schema_int), 
metadata={"iceberg.schema": schema_int.json()})
+
+    target_file = f"file:{tmpdir}/a.parquet"
+
+    with pq.ParquetWriter(target_file, pyarrow_schema) as writer:
+        writer.write_table(pa.Table.from_arrays([pa.array([0, 1, 2])], 
schema=pyarrow_schema))
+
+    return target_file
+
+
+@pytest.fixture
+def table_str(schema_str: Schema, tmpdir: str) -> str:
+    pyarrow_schema = pa.schema(schema_to_pyarrow(schema_str), 
metadata={"iceberg.schema": schema_str.json()})
+
+    target_file = f"file:{tmpdir}/b.parquet"
+
+    with pq.ParquetWriter(target_file, pyarrow_schema) as writer:
+        writer.write_table(pa.Table.from_arrays([pa.array([0, 1, 2])], 
schema=pyarrow_schema))
+
+    return target_file
+
+
+@pytest.fixture
+def table_long(schema_long: Schema, tmpdir: str) -> str:
+    pyarrow_schema = pa.schema(schema_to_pyarrow(schema_long), 
metadata={"iceberg.schema": schema_long.json()})
+
+    target_file = f"file:{tmpdir}/c.parquet"
+
+    with pq.ParquetWriter(target_file, pyarrow_schema) as writer:
+        writer.write_table(pa.Table.from_arrays([pa.array([0, 1, 2])], 
schema=pyarrow_schema))
+
+    return target_file
+
+
+def test_projection_add_column(schema_int: Schema, table_int: str) -> None:
+    schema = Schema(
+        # All new IDs
+        NestedField(10, "id", IntegerType(), required=False),
+        NestedField(20, "list", ListType(21, IntegerType(), 
element_required=False), required=False),
+        NestedField(
+            30,
+            "map",
+            MapType(key_id=31, key_type=IntegerType(), value_id=32, 
value_type=StringType(), value_required=False),
+            required=False,
+        ),
+        NestedField(40, "location", StructType(NestedField(41, "lat", 
DoubleType()), NestedField(42, "lon", DoubleType()))),
+    )
+    result_table = project_table(
+        [
+            FileScanTask(
+                DataFile(file_path=table_int, file_format=FileFormat.PARQUET, 
partition={}, record_count=3, file_size_in_bytes=3)
+            )
+        ],
+        Table(
+            ("namespace", "table"),
+            metadata=TableMetadataV2(
+                location="file://a/b/c.parquet",
+                last_column_id=1,
+                format_version=2,
+                schemas=[schema],
+                partition_specs=[PartitionSpec()],
+            ),
+            metadata_location="file://a/b/c.parquet",
+            io=PyArrowFileIO(),
+        ),
+        AlwaysTrue(),
+        schema,
+        case_sensitive=True,
+    )
+
+    # Everything should be None
+    for col in result_table.columns:
+        for r in col:
+            assert r.as_py() is None
+
+    assert (
+        repr(result_table.schema)
+        == """id: int32
+list: list<item: int32>
+  child 0, item: int32
+map: map<int32, string>
+  child 0, entries: struct<key: int32 not null, value: string> not null
+      child 0, key: int32 not null
+      child 1, value: string
+location: struct<lat: double not null, lon: double not null> not null
+  child 0, lat: double not null
+  child 1, lon: double not null"""
+    )
+
+
+def test_projection_add_column_struct(schema_int: Schema, table_int: str) -> 
None:
+    schema = Schema(
+        # A new ID
+        NestedField(
+            2,
+            "other_id",
+            MapType(key_id=3, key_type=IntegerType(), value_id=4, 
value_type=StringType(), value_required=False),
+            required=False,
+        )
+    )
+    result_table = project_table(
+        [
+            FileScanTask(
+                DataFile(file_path=table_int, file_format=FileFormat.PARQUET, 
partition={}, record_count=3, file_size_in_bytes=3)
+            )
+        ],
+        Table(
+            ("namespace", "table"),
+            metadata=TableMetadataV2(
+                location="file://a/b/c.parquet",
+                last_column_id=1,
+                format_version=2,
+                schemas=[schema],
+                partition_specs=[PartitionSpec()],
+            ),
+            metadata_location="file://a/b/c.parquet",
+            io=PyArrowFileIO(),
+        ),
+        AlwaysTrue(),
+        schema,
+        case_sensitive=True,
+    )
+    # Everything should be None
+    for r in result_table.columns[0]:
+        assert r.as_py() is None
+
+    assert (
+        repr(result_table.schema)
+        == """other_id: map<int32, string>
+  child 0, entries: struct<key: int32 not null, value: string> not null
+      child 0, key: int32 not null
+      child 1, value: string"""
+    )
+
+
+def test_projection_rename_column(schema_int: Schema, table_int: str) -> None:
+    schema = Schema(
+        # Reuses the id 1
+        NestedField(1, "other_id", IntegerType())
+    )
+    result_table = project_table(
+        [
+            FileScanTask(
+                DataFile(file_path=table_int, file_format=FileFormat.PARQUET, 
partition={}, record_count=3, file_size_in_bytes=3)
+            )
+        ],
+        Table(
+            ("namespace", "table"),
+            metadata=TableMetadataV2(
+                location="file://a/b/c.parquet",
+                last_column_id=1,
+                format_version=2,
+                schemas=[schema],
+                partition_specs=[PartitionSpec()],
+            ),
+            metadata_location="file://a/b/c.parquet",
+            io=PyArrowFileIO(),
+        ),
+        AlwaysTrue(),
+        schema,
+        case_sensitive=True,
+    )
+    for idx, r in enumerate(result_table.columns[0]):
+        assert r.as_py() == idx
+
+    assert repr(result_table.schema) == "other_id: int32 not null"
+
+
+def test_projection_concat_files(schema_int: Schema, table_int: str) -> None:
+    result_table = project_table(
+        [
+            FileScanTask(
+                DataFile(file_path=table_int, file_format=FileFormat.PARQUET, 
partition={}, record_count=3, file_size_in_bytes=3)
+            ),
+        ]
+        * 2,
+        Table(
+            ("namespace", "table"),
+            metadata=TableMetadataV2(
+                location="file://a/b/c.parquet",
+                last_column_id=1,
+                format_version=2,
+                schemas=[schema_int],
+                partition_specs=[PartitionSpec()],
+                current_schema_id=1,
+            ),
+            metadata_location="file://a/b/c.parquet",
+            io=PyArrowFileIO(),
+        ),
+        AlwaysTrue(),
+        schema_int,
+        case_sensitive=True,
+    )
+    for idx, r in enumerate(result_table.columns[0]):
+        assert r.as_py() == idx % 3
+    assert len(result_table.columns[0]) == 6
+    assert repr(result_table.schema) == "id: int32 not null"
+
+
+def test_projection_filter(schema_int: Schema, table_int: str) -> None:
+    result_table = project_table(
+        [
+            FileScanTask(
+                DataFile(file_path=table_int, file_format=FileFormat.PARQUET, 
partition={}, record_count=3, file_size_in_bytes=3)
+            ),
+        ]
+        * 2,
+        Table(
+            ("namespace", "table"),
+            metadata=TableMetadataV2(
+                location="file://a/b/c.parquet",
+                last_column_id=1,
+                format_version=2,
+                schemas=[schema_int],
+                partition_specs=[PartitionSpec()],
+                current_schema_id=1,
+            ),
+            metadata_location="file://a/b/c.parquet",
+            io=PyArrowFileIO(),
+        ),
+        GreaterThan("id", 4),
+        schema_int,
+        case_sensitive=True,
+    )
+    assert len(result_table.columns[0]) == 0

Review Comment:
   Why are there no columns? The buffer should still have the same structure, 
it should just be empty right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to