kevinjqliu commented on code in PR #444:
URL: https://github.com/apache/iceberg-python/pull/444#discussion_r1499783029


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata(
 
 
 def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
-    task = next(tasks)
+    # task = next(tasks)
+
+    # try:
+    #     _ = next(tasks)
+    #     # If there are more tasks, raise an exception
+    #     raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+    # except StopIteration:
+    #     pass
+    data_files = []
+    for task in tasks:
+        parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
+
+        file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
+        file_schema = schema_to_pyarrow(table.schema())
+
+        fo = table.io.new_output(file_path)
+        row_group_size = PropertyUtil.property_as_int(
+            properties=table.properties,
+            property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+            default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
+        )
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
+                writer.write_table(task.df, row_group_size=row_group_size)
+
+        data_file = DataFile(
+            content=DataFileContent.DATA,
+            file_path=file_path,
+            file_format=FileFormat.PARQUET,
+            partition=Record(),
+            file_size_in_bytes=len(fo),
+            # After this has been fixed:
+            # https://github.com/apache/iceberg-python/issues/271
+            # sort_order_id=task.sort_order_id,
+            sort_order_id=None,
+            # Just copy these from the table for now
+            spec_id=table.spec().spec_id,
+            equality_ids=None,
+            key_metadata=None,
+        )
 
-    try:
-        _ = next(tasks)
-        # If there are more tasks, raise an exception
-        raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
-    except StopIteration:
-        pass
+        fill_parquet_file_metadata(
+            data_file=data_file,
+            parquet_metadata=writer.writer.metadata,
+            stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
+            parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+        )
+        data_files.append(data_file)
+    return iter(data_files)
 
-    parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
 
-    file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
-    file_schema = schema_to_pyarrow(table.schema())
+def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]:
+    # bin-pack the table into 256 MB chunks
+    from pyiceberg.utils.bin_packing import PackingIterator
 
-    fo = table.io.new_output(file_path)
-    row_group_size = PropertyUtil.property_as_int(
-        properties=table.properties,
-        property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
-        default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
-    )
-    with fo.create(overwrite=True) as fos:
-        with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
-            writer.write_table(task.df, row_group_size=row_group_size)
-
-    data_file = DataFile(
-        content=DataFileContent.DATA,
-        file_path=file_path,
-        file_format=FileFormat.PARQUET,
-        partition=Record(),
-        file_size_in_bytes=len(fo),
-        # After this has been fixed:
-        # https://github.com/apache/iceberg-python/issues/271
-        # sort_order_id=task.sort_order_id,
-        sort_order_id=None,
-        # Just copy these from the table for now
-        spec_id=table.spec().spec_id,
-        equality_ids=None,
-        key_metadata=None,
-    )
-
-    fill_parquet_file_metadata(
-        data_file=data_file,
-        parquet_metadata=writer.writer.metadata,
-        stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
-        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
-    )
-    return iter([data_file])
+    splits = tbl.to_batches()
+    target_weight = 2 << 27  # 256 MB

Review Comment:
   I think this is done in Java like so 
https://github.com/apache/iceberg-python/issues/428#issuecomment-1953026131
   
   Write is done row by row and on every 1000 rows, the file size is checked 
against the desired size. 
   
   



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata(
 
 
 def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
-    task = next(tasks)
+    # task = next(tasks)
+
+    # try:
+    #     _ = next(tasks)
+    #     # If there are more tasks, raise an exception
+    #     raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+    # except StopIteration:
+    #     pass
+    data_files = []
+    for task in tasks:
+        parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
+
+        file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
+        file_schema = schema_to_pyarrow(table.schema())
+
+        fo = table.io.new_output(file_path)
+        row_group_size = PropertyUtil.property_as_int(
+            properties=table.properties,
+            property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+            default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
+        )
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
+                writer.write_table(task.df, row_group_size=row_group_size)
+
+        data_file = DataFile(
+            content=DataFileContent.DATA,
+            file_path=file_path,
+            file_format=FileFormat.PARQUET,
+            partition=Record(),
+            file_size_in_bytes=len(fo),
+            # After this has been fixed:
+            # https://github.com/apache/iceberg-python/issues/271
+            # sort_order_id=task.sort_order_id,
+            sort_order_id=None,
+            # Just copy these from the table for now
+            spec_id=table.spec().spec_id,
+            equality_ids=None,
+            key_metadata=None,
+        )
 
-    try:
-        _ = next(tasks)
-        # If there are more tasks, raise an exception
-        raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
-    except StopIteration:
-        pass
+        fill_parquet_file_metadata(
+            data_file=data_file,
+            parquet_metadata=writer.writer.metadata,
+            stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
+            parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+        )
+        data_files.append(data_file)
+    return iter(data_files)
 
-    parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
 
-    file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
-    file_schema = schema_to_pyarrow(table.schema())
+def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]:
+    # bin-pack the table into 256 MB chunks
+    from pyiceberg.utils.bin_packing import PackingIterator
 
-    fo = table.io.new_output(file_path)
-    row_group_size = PropertyUtil.property_as_int(
-        properties=table.properties,
-        property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
-        default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
-    )
-    with fo.create(overwrite=True) as fos:
-        with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
-            writer.write_table(task.df, row_group_size=row_group_size)
-
-    data_file = DataFile(
-        content=DataFileContent.DATA,
-        file_path=file_path,
-        file_format=FileFormat.PARQUET,
-        partition=Record(),
-        file_size_in_bytes=len(fo),
-        # After this has been fixed:
-        # https://github.com/apache/iceberg-python/issues/271
-        # sort_order_id=task.sort_order_id,
-        sort_order_id=None,
-        # Just copy these from the table for now
-        spec_id=table.spec().spec_id,
-        equality_ids=None,
-        key_metadata=None,
-    )
-
-    fill_parquet_file_metadata(
-        data_file=data_file,
-        parquet_metadata=writer.writer.metadata,
-        stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
-        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
-    )
-    return iter([data_file])
+    splits = tbl.to_batches()
+    target_weight = 2 << 27  # 256 MB

Review Comment:
   and the `write.target-file-size-bytes` configuration is just a heuristic to 
achieve, not the absolute size of the result file.
   
   Based on [this 
comment](https://github.com/apache/iceberg/issues/8729#issuecomment-1750041439),
 it seems that even in Spark result parquet files can be smaller than the 
target file size. 
   
   
   



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata(
 
 
 def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
-    task = next(tasks)
+    # task = next(tasks)
+
+    # try:
+    #     _ = next(tasks)
+    #     # If there are more tasks, raise an exception
+    #     raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+    # except StopIteration:
+    #     pass
+    data_files = []
+    for task in tasks:
+        parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
+
+        file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
+        file_schema = schema_to_pyarrow(table.schema())
+
+        fo = table.io.new_output(file_path)
+        row_group_size = PropertyUtil.property_as_int(
+            properties=table.properties,
+            property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+            default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
+        )
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
+                writer.write_table(task.df, row_group_size=row_group_size)
+
+        data_file = DataFile(
+            content=DataFileContent.DATA,
+            file_path=file_path,
+            file_format=FileFormat.PARQUET,
+            partition=Record(),
+            file_size_in_bytes=len(fo),
+            # After this has been fixed:
+            # https://github.com/apache/iceberg-python/issues/271
+            # sort_order_id=task.sort_order_id,
+            sort_order_id=None,
+            # Just copy these from the table for now
+            spec_id=table.spec().spec_id,
+            equality_ids=None,
+            key_metadata=None,
+        )
 
-    try:
-        _ = next(tasks)
-        # If there are more tasks, raise an exception
-        raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
-    except StopIteration:
-        pass
+        fill_parquet_file_metadata(
+            data_file=data_file,
+            parquet_metadata=writer.writer.metadata,
+            stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
+            parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+        )
+        data_files.append(data_file)
+    return iter(data_files)
 
-    parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
 
-    file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
-    file_schema = schema_to_pyarrow(table.schema())
+def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]:
+    # bin-pack the table into 256 MB chunks
+    from pyiceberg.utils.bin_packing import PackingIterator
 
-    fo = table.io.new_output(file_path)
-    row_group_size = PropertyUtil.property_as_int(
-        properties=table.properties,
-        property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
-        default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
-    )
-    with fo.create(overwrite=True) as fos:
-        with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
-            writer.write_table(task.df, row_group_size=row_group_size)
-
-    data_file = DataFile(
-        content=DataFileContent.DATA,
-        file_path=file_path,
-        file_format=FileFormat.PARQUET,
-        partition=Record(),
-        file_size_in_bytes=len(fo),
-        # After this has been fixed:
-        # https://github.com/apache/iceberg-python/issues/271
-        # sort_order_id=task.sort_order_id,
-        sort_order_id=None,
-        # Just copy these from the table for now
-        spec_id=table.spec().spec_id,
-        equality_ids=None,
-        key_metadata=None,
-    )
-
-    fill_parquet_file_metadata(
-        data_file=data_file,
-        parquet_metadata=writer.writer.metadata,
-        stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
-        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
-    )
-    return iter([data_file])
+    splits = tbl.to_batches()
+    target_weight = 2 << 27  # 256 MB

Review Comment:
   For now, I propose we reuse the `write.target-file-size-bytes` option and 
default to 512MB of arrow size in memory. 



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata(
 
 
 def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
-    task = next(tasks)
+    # task = next(tasks)
+
+    # try:
+    #     _ = next(tasks)
+    #     # If there are more tasks, raise an exception
+    #     raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+    # except StopIteration:
+    #     pass
+    data_files = []
+    for task in tasks:
+        parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
+
+        file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
+        file_schema = schema_to_pyarrow(table.schema())
+
+        fo = table.io.new_output(file_path)
+        row_group_size = PropertyUtil.property_as_int(
+            properties=table.properties,
+            property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+            default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
+        )
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
+                writer.write_table(task.df, row_group_size=row_group_size)
+
+        data_file = DataFile(
+            content=DataFileContent.DATA,
+            file_path=file_path,
+            file_format=FileFormat.PARQUET,
+            partition=Record(),
+            file_size_in_bytes=len(fo),
+            # After this has been fixed:
+            # https://github.com/apache/iceberg-python/issues/271
+            # sort_order_id=task.sort_order_id,
+            sort_order_id=None,
+            # Just copy these from the table for now
+            spec_id=table.spec().spec_id,
+            equality_ids=None,
+            key_metadata=None,
+        )
 
-    try:
-        _ = next(tasks)
-        # If there are more tasks, raise an exception
-        raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
-    except StopIteration:
-        pass
+        fill_parquet_file_metadata(
+            data_file=data_file,
+            parquet_metadata=writer.writer.metadata,
+            stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
+            parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+        )
+        data_files.append(data_file)
+    return iter(data_files)
 
-    parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
 
-    file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
-    file_schema = schema_to_pyarrow(table.schema())
+def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]:
+    # bin-pack the table into 256 MB chunks
+    from pyiceberg.utils.bin_packing import PackingIterator
 
-    fo = table.io.new_output(file_path)
-    row_group_size = PropertyUtil.property_as_int(
-        properties=table.properties,
-        property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
-        default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
-    )
-    with fo.create(overwrite=True) as fos:
-        with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
-            writer.write_table(task.df, row_group_size=row_group_size)
-
-    data_file = DataFile(
-        content=DataFileContent.DATA,
-        file_path=file_path,
-        file_format=FileFormat.PARQUET,
-        partition=Record(),
-        file_size_in_bytes=len(fo),
-        # After this has been fixed:
-        # https://github.com/apache/iceberg-python/issues/271
-        # sort_order_id=task.sort_order_id,
-        sort_order_id=None,
-        # Just copy these from the table for now
-        spec_id=table.spec().spec_id,
-        equality_ids=None,
-        key_metadata=None,
-    )
-
-    fill_parquet_file_metadata(
-        data_file=data_file,
-        parquet_metadata=writer.writer.metadata,
-        stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
-        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
-    )
-    return iter([data_file])
+    splits = tbl.to_batches()
+    target_weight = 2 << 27  # 256 MB

Review Comment:
   Here's a [test 
run](https://github.com/apache/iceberg-python/issues/428#issuecomment-1953010526)
 when we bin-packed 685.46 MB of arrow memory into 256MB chunks. We ended up 
with 3 ~80MB parquet files.
   
   



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata(
 
 
 def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
-    task = next(tasks)
+    # task = next(tasks)
+
+    # try:
+    #     _ = next(tasks)
+    #     # If there are more tasks, raise an exception
+    #     raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+    # except StopIteration:
+    #     pass
+    data_files = []
+    for task in tasks:
+        parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
+
+        file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
+        file_schema = schema_to_pyarrow(table.schema())
+
+        fo = table.io.new_output(file_path)
+        row_group_size = PropertyUtil.property_as_int(
+            properties=table.properties,
+            property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+            default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
+        )
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
+                writer.write_table(task.df, row_group_size=row_group_size)
+
+        data_file = DataFile(
+            content=DataFileContent.DATA,
+            file_path=file_path,
+            file_format=FileFormat.PARQUET,
+            partition=Record(),
+            file_size_in_bytes=len(fo),
+            # After this has been fixed:
+            # https://github.com/apache/iceberg-python/issues/271
+            # sort_order_id=task.sort_order_id,
+            sort_order_id=None,
+            # Just copy these from the table for now
+            spec_id=table.spec().spec_id,
+            equality_ids=None,
+            key_metadata=None,
+        )
 
-    try:
-        _ = next(tasks)
-        # If there are more tasks, raise an exception
-        raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
-    except StopIteration:
-        pass
+        fill_parquet_file_metadata(
+            data_file=data_file,
+            parquet_metadata=writer.writer.metadata,
+            stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
+            parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+        )
+        data_files.append(data_file)
+    return iter(data_files)
 
-    parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
 
-    file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
-    file_schema = schema_to_pyarrow(table.schema())
+def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]:
+    # bin-pack the table into 256 MB chunks
+    from pyiceberg.utils.bin_packing import PackingIterator
 
-    fo = table.io.new_output(file_path)
-    row_group_size = PropertyUtil.property_as_int(
-        properties=table.properties,
-        property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
-        default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
-    )
-    with fo.create(overwrite=True) as fos:
-        with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
-            writer.write_table(task.df, row_group_size=row_group_size)
-
-    data_file = DataFile(
-        content=DataFileContent.DATA,
-        file_path=file_path,
-        file_format=FileFormat.PARQUET,
-        partition=Record(),
-        file_size_in_bytes=len(fo),
-        # After this has been fixed:
-        # https://github.com/apache/iceberg-python/issues/271
-        # sort_order_id=task.sort_order_id,
-        sort_order_id=None,
-        # Just copy these from the table for now
-        spec_id=table.spec().spec_id,
-        equality_ids=None,
-        key_metadata=None,
-    )
-
-    fill_parquet_file_metadata(
-        data_file=data_file,
-        parquet_metadata=writer.writer.metadata,
-        stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
-        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
-    )
-    return iter([data_file])
+    splits = tbl.to_batches()
+    target_weight = 2 << 27  # 256 MB
+    bin_packed = PackingIterator(splits, target_weight, lookback=2, 
weight_func=lambda x: x.nbytes, largest_bin_first=True)
+    return [pa.Table.from_batches(bin) for bin in bin_packed]

Review Comment:
   sg. I did this so that I can change as little as possible to `WriteTask` and 
`write_file`.
   
   I'll make the change to use `RecordBatch` instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to