ConeyLiu commented on code in PR #1427:

@@ -873,3 +874,76 @@ def test_table_scan_empty_table(catalog: Catalog) -> None:
     result_table = tbl.scan().to_arrow()
     assert len(result_table) == 0
+def test_plan_tasks(session_catalog: Catalog) -> None:
+    from pyiceberg.table import TableProperties
+    table_name = "default.test_plan_tasks"
+    try:
+        session_catalog.drop_table(table_name)
+    except NoSuchTableError:
+        pass  # Just to make sure that the table doesn't exist
+    tbl = session_catalog.create_table(
+        table_name,
+        Schema(
+            NestedField(1, "number", LongType()),
+        ),
+        properties={TableProperties.PARQUET_ROW_GROUP_LIMIT: "1"},
+    )
+    # Write 10 row groups, that should end up as 10 batches
+    entries = 10
+    tbl.append(
+        pa.Table.from_pylist(
+            [
+                {
+                    "number": number,
+                }
+                for number in range(entries)
+            ],
+        )
+    )
+    assert len(tbl.inspect.files()) == 1
+    plan_files = list(tbl.scan().plan_files())
+    assert len(plan_files) == 1
+    data_file = plan_files[0].file
+    assert data_file.split_offsets is not None and 
len(data_file.split_offsets) == 10
+    plan_tasks = list(tbl.scan(options={TableProperties.READ_SPLIT_SIZE: 
+    assert len(plan_tasks) == 10
+    split_offsets = []
+    for task in plan_tasks:
+        assert len(task.tasks) == 1
+        split_offsets.append(task.tasks[0].start)
+    assert split_offsets == plan_files[0].file.split_offsets
+    split_sizes = []
+    for i in range(1, len(data_file.split_offsets)):
+        split_sizes.append(data_file.split_offsets[i] - 
data_file.split_offsets[i - 1])
+    split_sizes.append(data_file.file_size_in_bytes - 
+    read_split_size = int(data_file.file_size_in_bytes / 4)
+    read_split_open_file_cost = 1
+    read_split_lookback = 5
+    plan_tasks = list(

Review Comment:
   @kevinjqliu added more tests here, pls give more advice.

