Fokko commented on code in PR #444: URL: https://github.com/apache/iceberg-python/pull/444#discussion_r1499027177
########## pyiceberg/io/pyarrow.py: ########## @@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata( def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: - task = next(tasks) + # task = next(tasks) + + # try: + # _ = next(tasks) + # # If there are more tasks, raise an exception + # raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") + # except StopIteration: + # pass + data_files = [] + for task in tasks: + parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) + + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' + file_schema = schema_to_pyarrow(table.schema()) + + fo = table.io.new_output(file_path) + row_group_size = PropertyUtil.property_as_int( + properties=table.properties, + property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, + default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, + ) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: + writer.write_table(task.df, row_group_size=row_group_size) + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=len(fo), + # After this has been fixed: + # https://github.com/apache/iceberg-python/issues/271 + # sort_order_id=task.sort_order_id, + sort_order_id=None, + # Just copy these from the table for now + spec_id=table.spec().spec_id, + equality_ids=None, + key_metadata=None, + ) - try: - _ = next(tasks) - # If there are more tasks, raise an exception - raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") - except StopIteration: - pass + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=writer.writer.metadata, + stats_columns=compute_statistics_plan(table.schema(), table.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + data_files.append(data_file) + return iter(data_files) - parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) - file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' - file_schema = schema_to_pyarrow(table.schema()) +def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]: + # bin-pack the table into 256 MB chunks + from pyiceberg.utils.bin_packing import PackingIterator - fo = table.io.new_output(file_path) - row_group_size = PropertyUtil.property_as_int( - properties=table.properties, - property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, - default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, - ) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: - writer.write_table(task.df, row_group_size=row_group_size) - - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=len(fo), - # After this has been fixed: - # https://github.com/apache/iceberg-python/issues/271 - # sort_order_id=task.sort_order_id, - sort_order_id=None, - # Just copy these from the table for now - spec_id=table.spec().spec_id, - equality_ids=None, - key_metadata=None, - ) - - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=writer.writer.metadata, - stats_columns=compute_statistics_plan(table.schema(), table.properties), - parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), - ) - return iter([data_file]) + splits = tbl.to_batches() + target_weight = 2 << 27 # 256 MB Review Comment: In Java we have the `write.target-file-size-bytes` configuration. In this case, we're looking at the size in memory, and not the file size. Converting this is very tricky since Parquet has some excellent encodings to reduce the size on disk. We might want to check the heuristic on the Java side. On the other end, we also don't want to explode the memory when decoding a Parquet file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org