Fokko commented on code in PR #444:
URL: https://github.com/apache/iceberg-python/pull/444#discussion_r1499027177


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata(
 
 
 def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
-    task = next(tasks)
+    # task = next(tasks)
+
+    # try:
+    #     _ = next(tasks)
+    #     # If there are more tasks, raise an exception
+    #     raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+    # except StopIteration:
+    #     pass
+    data_files = []
+    for task in tasks:
+        parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
+
+        file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
+        file_schema = schema_to_pyarrow(table.schema())
+
+        fo = table.io.new_output(file_path)
+        row_group_size = PropertyUtil.property_as_int(
+            properties=table.properties,
+            property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+            default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
+        )
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
+                writer.write_table(task.df, row_group_size=row_group_size)
+
+        data_file = DataFile(
+            content=DataFileContent.DATA,
+            file_path=file_path,
+            file_format=FileFormat.PARQUET,
+            partition=Record(),
+            file_size_in_bytes=len(fo),
+            # After this has been fixed:
+            # https://github.com/apache/iceberg-python/issues/271
+            # sort_order_id=task.sort_order_id,
+            sort_order_id=None,
+            # Just copy these from the table for now
+            spec_id=table.spec().spec_id,
+            equality_ids=None,
+            key_metadata=None,
+        )
 
-    try:
-        _ = next(tasks)
-        # If there are more tasks, raise an exception
-        raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
-    except StopIteration:
-        pass
+        fill_parquet_file_metadata(
+            data_file=data_file,
+            parquet_metadata=writer.writer.metadata,
+            stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
+            parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+        )
+        data_files.append(data_file)
+    return iter(data_files)
 
-    parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
 
-    file_path = 
f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
-    file_schema = schema_to_pyarrow(table.schema())
+def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]:
+    # bin-pack the table into 256 MB chunks
+    from pyiceberg.utils.bin_packing import PackingIterator
 
-    fo = table.io.new_output(file_path)
-    row_group_size = PropertyUtil.property_as_int(
-        properties=table.properties,
-        property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
-        default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
-    )
-    with fo.create(overwrite=True) as fos:
-        with pq.ParquetWriter(fos, schema=file_schema, 
**parquet_writer_kwargs) as writer:
-            writer.write_table(task.df, row_group_size=row_group_size)
-
-    data_file = DataFile(
-        content=DataFileContent.DATA,
-        file_path=file_path,
-        file_format=FileFormat.PARQUET,
-        partition=Record(),
-        file_size_in_bytes=len(fo),
-        # After this has been fixed:
-        # https://github.com/apache/iceberg-python/issues/271
-        # sort_order_id=task.sort_order_id,
-        sort_order_id=None,
-        # Just copy these from the table for now
-        spec_id=table.spec().spec_id,
-        equality_ids=None,
-        key_metadata=None,
-    )
-
-    fill_parquet_file_metadata(
-        data_file=data_file,
-        parquet_metadata=writer.writer.metadata,
-        stats_columns=compute_statistics_plan(table.schema(), 
table.properties),
-        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
-    )
-    return iter([data_file])
+    splits = tbl.to_batches()
+    target_weight = 2 << 27  # 256 MB

Review Comment:
   In Java we have the `write.target-file-size-bytes` configuration. In this 
case, we're looking at the size in memory, and not the file size. Converting 
this is very tricky since Parquet has some excellent encodings to reduce the 
size on disk. We might want to check the heuristic on the Java side. On the 
other end, we also don't want to explode the memory when decoding a Parquet file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to