kevinjqliu commented on code in PR #444: URL: https://github.com/apache/iceberg-python/pull/444#discussion_r1499783029
########## pyiceberg/io/pyarrow.py: ########## @@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata( def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: - task = next(tasks) + # task = next(tasks) + + # try: + # _ = next(tasks) + # # If there are more tasks, raise an exception + # raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") + # except StopIteration: + # pass + data_files = [] + for task in tasks: + parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) + + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' + file_schema = schema_to_pyarrow(table.schema()) + + fo = table.io.new_output(file_path) + row_group_size = PropertyUtil.property_as_int( + properties=table.properties, + property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, + default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, + ) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: + writer.write_table(task.df, row_group_size=row_group_size) + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=len(fo), + # After this has been fixed: + # https://github.com/apache/iceberg-python/issues/271 + # sort_order_id=task.sort_order_id, + sort_order_id=None, + # Just copy these from the table for now + spec_id=table.spec().spec_id, + equality_ids=None, + key_metadata=None, + ) - try: - _ = next(tasks) - # If there are more tasks, raise an exception - raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") - except StopIteration: - pass + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=writer.writer.metadata, + stats_columns=compute_statistics_plan(table.schema(), table.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + data_files.append(data_file) + return iter(data_files) - parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) - file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' - file_schema = schema_to_pyarrow(table.schema()) +def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]: + # bin-pack the table into 256 MB chunks + from pyiceberg.utils.bin_packing import PackingIterator - fo = table.io.new_output(file_path) - row_group_size = PropertyUtil.property_as_int( - properties=table.properties, - property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, - default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, - ) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: - writer.write_table(task.df, row_group_size=row_group_size) - - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=len(fo), - # After this has been fixed: - # https://github.com/apache/iceberg-python/issues/271 - # sort_order_id=task.sort_order_id, - sort_order_id=None, - # Just copy these from the table for now - spec_id=table.spec().spec_id, - equality_ids=None, - key_metadata=None, - ) - - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=writer.writer.metadata, - stats_columns=compute_statistics_plan(table.schema(), table.properties), - parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), - ) - return iter([data_file]) + splits = tbl.to_batches() + target_weight = 2 << 27 # 256 MB Review Comment: I think this is done in Java like so https://github.com/apache/iceberg-python/issues/428#issuecomment-1953026131 Write is done row by row and on every 1000 rows, the file size is checked against the desired size. ########## pyiceberg/io/pyarrow.py: ########## @@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata( def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: - task = next(tasks) + # task = next(tasks) + + # try: + # _ = next(tasks) + # # If there are more tasks, raise an exception + # raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") + # except StopIteration: + # pass + data_files = [] + for task in tasks: + parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) + + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' + file_schema = schema_to_pyarrow(table.schema()) + + fo = table.io.new_output(file_path) + row_group_size = PropertyUtil.property_as_int( + properties=table.properties, + property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, + default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, + ) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: + writer.write_table(task.df, row_group_size=row_group_size) + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=len(fo), + # After this has been fixed: + # https://github.com/apache/iceberg-python/issues/271 + # sort_order_id=task.sort_order_id, + sort_order_id=None, + # Just copy these from the table for now + spec_id=table.spec().spec_id, + equality_ids=None, + key_metadata=None, + ) - try: - _ = next(tasks) - # If there are more tasks, raise an exception - raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") - except StopIteration: - pass + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=writer.writer.metadata, + stats_columns=compute_statistics_plan(table.schema(), table.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + data_files.append(data_file) + return iter(data_files) - parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) - file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' - file_schema = schema_to_pyarrow(table.schema()) +def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]: + # bin-pack the table into 256 MB chunks + from pyiceberg.utils.bin_packing import PackingIterator - fo = table.io.new_output(file_path) - row_group_size = PropertyUtil.property_as_int( - properties=table.properties, - property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, - default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, - ) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: - writer.write_table(task.df, row_group_size=row_group_size) - - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=len(fo), - # After this has been fixed: - # https://github.com/apache/iceberg-python/issues/271 - # sort_order_id=task.sort_order_id, - sort_order_id=None, - # Just copy these from the table for now - spec_id=table.spec().spec_id, - equality_ids=None, - key_metadata=None, - ) - - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=writer.writer.metadata, - stats_columns=compute_statistics_plan(table.schema(), table.properties), - parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), - ) - return iter([data_file]) + splits = tbl.to_batches() + target_weight = 2 << 27 # 256 MB Review Comment: and the `write.target-file-size-bytes` configuration is just a heuristic to achieve, not the absolute size of the result file. Based on [this comment](https://github.com/apache/iceberg/issues/8729#issuecomment-1750041439), it seems that even in Spark result parquet files can be smaller than the target file size. ########## pyiceberg/io/pyarrow.py: ########## @@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata( def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: - task = next(tasks) + # task = next(tasks) + + # try: + # _ = next(tasks) + # # If there are more tasks, raise an exception + # raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") + # except StopIteration: + # pass + data_files = [] + for task in tasks: + parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) + + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' + file_schema = schema_to_pyarrow(table.schema()) + + fo = table.io.new_output(file_path) + row_group_size = PropertyUtil.property_as_int( + properties=table.properties, + property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, + default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, + ) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: + writer.write_table(task.df, row_group_size=row_group_size) + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=len(fo), + # After this has been fixed: + # https://github.com/apache/iceberg-python/issues/271 + # sort_order_id=task.sort_order_id, + sort_order_id=None, + # Just copy these from the table for now + spec_id=table.spec().spec_id, + equality_ids=None, + key_metadata=None, + ) - try: - _ = next(tasks) - # If there are more tasks, raise an exception - raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") - except StopIteration: - pass + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=writer.writer.metadata, + stats_columns=compute_statistics_plan(table.schema(), table.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + data_files.append(data_file) + return iter(data_files) - parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) - file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' - file_schema = schema_to_pyarrow(table.schema()) +def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]: + # bin-pack the table into 256 MB chunks + from pyiceberg.utils.bin_packing import PackingIterator - fo = table.io.new_output(file_path) - row_group_size = PropertyUtil.property_as_int( - properties=table.properties, - property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, - default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, - ) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: - writer.write_table(task.df, row_group_size=row_group_size) - - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=len(fo), - # After this has been fixed: - # https://github.com/apache/iceberg-python/issues/271 - # sort_order_id=task.sort_order_id, - sort_order_id=None, - # Just copy these from the table for now - spec_id=table.spec().spec_id, - equality_ids=None, - key_metadata=None, - ) - - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=writer.writer.metadata, - stats_columns=compute_statistics_plan(table.schema(), table.properties), - parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), - ) - return iter([data_file]) + splits = tbl.to_batches() + target_weight = 2 << 27 # 256 MB Review Comment: For now, I propose we reuse the `write.target-file-size-bytes` option and default to 512MB of arrow size in memory. ########## pyiceberg/io/pyarrow.py: ########## @@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata( def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: - task = next(tasks) + # task = next(tasks) + + # try: + # _ = next(tasks) + # # If there are more tasks, raise an exception + # raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") + # except StopIteration: + # pass + data_files = [] + for task in tasks: + parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) + + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' + file_schema = schema_to_pyarrow(table.schema()) + + fo = table.io.new_output(file_path) + row_group_size = PropertyUtil.property_as_int( + properties=table.properties, + property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, + default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, + ) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: + writer.write_table(task.df, row_group_size=row_group_size) + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=len(fo), + # After this has been fixed: + # https://github.com/apache/iceberg-python/issues/271 + # sort_order_id=task.sort_order_id, + sort_order_id=None, + # Just copy these from the table for now + spec_id=table.spec().spec_id, + equality_ids=None, + key_metadata=None, + ) - try: - _ = next(tasks) - # If there are more tasks, raise an exception - raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") - except StopIteration: - pass + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=writer.writer.metadata, + stats_columns=compute_statistics_plan(table.schema(), table.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + data_files.append(data_file) + return iter(data_files) - parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) - file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' - file_schema = schema_to_pyarrow(table.schema()) +def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]: + # bin-pack the table into 256 MB chunks + from pyiceberg.utils.bin_packing import PackingIterator - fo = table.io.new_output(file_path) - row_group_size = PropertyUtil.property_as_int( - properties=table.properties, - property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, - default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, - ) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: - writer.write_table(task.df, row_group_size=row_group_size) - - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=len(fo), - # After this has been fixed: - # https://github.com/apache/iceberg-python/issues/271 - # sort_order_id=task.sort_order_id, - sort_order_id=None, - # Just copy these from the table for now - spec_id=table.spec().spec_id, - equality_ids=None, - key_metadata=None, - ) - - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=writer.writer.metadata, - stats_columns=compute_statistics_plan(table.schema(), table.properties), - parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), - ) - return iter([data_file]) + splits = tbl.to_batches() + target_weight = 2 << 27 # 256 MB Review Comment: Here's a [test run](https://github.com/apache/iceberg-python/issues/428#issuecomment-1953010526) when we bin-packed 685.46 MB of arrow memory into 256MB chunks. We ended up with 3 ~80MB parquet files. ########## pyiceberg/io/pyarrow.py: ########## @@ -1715,53 +1715,65 @@ def fill_parquet_file_metadata( def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: - task = next(tasks) + # task = next(tasks) + + # try: + # _ = next(tasks) + # # If there are more tasks, raise an exception + # raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") + # except StopIteration: + # pass + data_files = [] + for task in tasks: + parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) + + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' + file_schema = schema_to_pyarrow(table.schema()) + + fo = table.io.new_output(file_path) + row_group_size = PropertyUtil.property_as_int( + properties=table.properties, + property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, + default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, + ) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: + writer.write_table(task.df, row_group_size=row_group_size) + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=len(fo), + # After this has been fixed: + # https://github.com/apache/iceberg-python/issues/271 + # sort_order_id=task.sort_order_id, + sort_order_id=None, + # Just copy these from the table for now + spec_id=table.spec().spec_id, + equality_ids=None, + key_metadata=None, + ) - try: - _ = next(tasks) - # If there are more tasks, raise an exception - raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") - except StopIteration: - pass + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=writer.writer.metadata, + stats_columns=compute_statistics_plan(table.schema(), table.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + data_files.append(data_file) + return iter(data_files) - parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) - file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' - file_schema = schema_to_pyarrow(table.schema()) +def bin_pack_arrow_table(tbl: pa.Table) -> List[pa.Table]: + # bin-pack the table into 256 MB chunks + from pyiceberg.utils.bin_packing import PackingIterator - fo = table.io.new_output(file_path) - row_group_size = PropertyUtil.property_as_int( - properties=table.properties, - property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, - default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, - ) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema, **parquet_writer_kwargs) as writer: - writer.write_table(task.df, row_group_size=row_group_size) - - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=len(fo), - # After this has been fixed: - # https://github.com/apache/iceberg-python/issues/271 - # sort_order_id=task.sort_order_id, - sort_order_id=None, - # Just copy these from the table for now - spec_id=table.spec().spec_id, - equality_ids=None, - key_metadata=None, - ) - - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=writer.writer.metadata, - stats_columns=compute_statistics_plan(table.schema(), table.properties), - parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), - ) - return iter([data_file]) + splits = tbl.to_batches() + target_weight = 2 << 27 # 256 MB + bin_packed = PackingIterator(splits, target_weight, lookback=2, weight_func=lambda x: x.nbytes, largest_bin_first=True) + return [pa.Table.from_batches(bin) for bin in bin_packed] Review Comment: sg. I did this so that I can change as little as possible to `WriteTask` and `write_file`. I'll make the change to use `RecordBatch` instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org