Fokko commented on code in PR #444:
URL: https://github.com/apache/iceberg-python/pull/444#discussion_r1499000219


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata(
 
 
 def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
-    task = next(tasks)
+    # task = next(tasks)
+
+    # try:
+    #     _ = next(tasks)
+    #     # If there are more tasks, raise an exception
+    #     raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+    # except StopIteration:
+    #     pass
+    data_files = []
+    for task in tasks:
+        parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
+
+        file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
+        file_schema = schema_to_pyarrow(table.schema())
+
+        fo = table.io.new_output(file_path)
+        row_group_size = PropertyUtil.property_as_int(
+            properties=table.properties,
+            property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+            default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
+        )
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
+                writer.write_table(task.df, row_group_size=row_group_size)
+
+        data_file = DataFile(
+            content=DataFileContent.DATA,
+            file_path=file_path,
+            file_format=FileFormat.PARQUET,
+            partition=Record(),
+            file_size_in_bytes=len(fo),
+            # After this has been fixed:
+            # https://github.com/apache/iceberg-python/issues/271
+            # sort_order_id=task.sort_order_id,
+            sort_order_id=None,
+            # Just copy these from the table for now
+            spec_id=table.spec().spec_id,
+            equality_ids=None,
+            key_metadata=None,
+        )
 
-    try:
-        _ = next(tasks)
-        # If there are more tasks, raise an exception
-        raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
-    except StopIteration:
-        pass
+        fill_parquet_file_metadata(
+            data_file=data_file,
+            parquet_metadata=writer.writer.metadata,
+            stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
+            parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+        )
+        data_files.append(data_file)
+    return iter(data_files)
 
-    parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
 
-    file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
-    file_schema = schema_to_pyarrow(table.schema())
+def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]:
+    # bin-pack the table into 256 MB chunks
+    from pyiceberg.utils.bin_packing import PackingIterator
 
-    fo = table.io.new_output(file_path)
-    row_group_size = PropertyUtil.property_as_int(
-        properties=table.properties,
-        property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
-        default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
-    )
-    with fo.create(overwrite=True) as fos:
-        with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
-            writer.write_table(task.df, row_group_size=row_group_size)
-
-    data_file = DataFile(
-        content=DataFileContent.DATA,
-        file_path=file_path,
-        file_format=FileFormat.PARQUET,
-        partition=Record(),
-        file_size_in_bytes=len(fo),
-        # After this has been fixed:
-        # https://github.com/apache/iceberg-python/issues/271
-        # sort_order_id=task.sort_order_id,
-        sort_order_id=None,
-        # Just copy these from the table for now
-        spec_id=table.spec().spec_id,
-        equality_ids=None,
-        key_metadata=None,
-    )
-
-    fill_parquet_file_metadata(
-        data_file=data_file,
-        parquet_metadata=writer.writer.metadata,
-        stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
-        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
-    )
-    return iter([data_file])
+    splits = tbl.to_batches()
+    target_weight = 2 << 27  # 256 MB
+    bin_packed = PackingIterator(splits, target_weight, lookback=2, 
weight_func=lambda x: x.nbytes, largest_bin_first=True)
+    return [pa.Table.from_batches(bin) for bin in bin_packed]

Review Comment:
   I don't think we need to create a table from it. We can also write the 
batches itself: 
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html#pyarrow.parquet.ParquetWriter.write_batch
   
   Idk if there is any overhead in creating a table, but if it isn't needed, I 
think it is better to avoid it 👍 
   
   ```suggestion
       return bin_packed
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to