Fokko commented on code in PR #555: URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1547810906
########## pyiceberg/manifest.py: ########## @@ -289,10 +286,7 @@ def partition_field_to_data_file_partition_field(partition_field_type: IcebergTy @partition_field_to_data_file_partition_field.register(LongType) -@partition_field_to_data_file_partition_field.register(DateType) Review Comment: This single-dispatch is there only for the `TimeType` it seems. Probably we should we should also convert those into a native type. ########## tests/conftest.py: ########## @@ -2000,7 +2000,11 @@ def spark() -> "SparkSession": 'float': [0.0, None, 0.9], 'double': [0.0, None, 0.9], 'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], - 'timestamptz': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], + 'timestamptz': [ Review Comment: Nice one! ########## pyiceberg/table/__init__.py: ########## @@ -3111,3 +3147,112 @@ def snapshots(self) -> "pa.Table": snapshots, schema=snapshots_schema, ) + + +@dataclass(frozen=True) +class TablePartition: + partition_key: PartitionKey + arrow_table_partition: pa.Table + + +def _get_partition_sort_order(partition_columns: list[str], reverse: bool = False) -> dict[str, Any]: + order = 'ascending' if not reverse else 'descending' + null_placement = 'at_start' if reverse else 'at_end' + return {'sort_keys': [(column_name, order) for column_name in partition_columns], 'null_placement': null_placement} + + +def group_by_partition_scheme(arrow_table: pa.Table, partition_columns: list[str]) -> pa.Table: + """Given a table, sort it by current partition scheme.""" + # only works for identity for now + sort_options = _get_partition_sort_order(partition_columns, reverse=False) + sorted_arrow_table = arrow_table.sort_by(sorting=sort_options['sort_keys'], null_placement=sort_options['null_placement']) + return sorted_arrow_table + + +def get_partition_columns( + spec: PartitionSpec, + schema: Schema, +) -> list[str]: + partition_cols = [] + for partition_field in spec.fields: + column_name = schema.find_column_name(partition_field.source_id) + if not column_name: + raise ValueError(f"{partition_field=} could not be found in {schema}.") + partition_cols.append(column_name) + return partition_cols + + +def _get_table_partitions( + arrow_table: pa.Table, + partition_spec: PartitionSpec, + schema: Schema, + slice_instructions: list[dict[str, Any]], +) -> list[TablePartition]: + sorted_slice_instructions = sorted(slice_instructions, key=lambda x: x['offset']) + + partition_fields = partition_spec.fields + + offsets = [inst["offset"] for inst in sorted_slice_instructions] + projected_and_filtered = { + partition_field.source_id: arrow_table[schema.find_field(name_or_id=partition_field.source_id).name] + .take(offsets) + .to_pylist() + for partition_field in partition_fields + } + + table_partitions = [] + for idx, inst in enumerate(sorted_slice_instructions): + partition_slice = arrow_table.slice(**inst) + fieldvalues = [ + PartitionFieldValue(partition_field, projected_and_filtered[partition_field.source_id][idx]) + for partition_field in partition_fields + ] + partition_key = PartitionKey(raw_partition_field_values=fieldvalues, partition_spec=partition_spec, schema=schema) + table_partitions.append(TablePartition(partition_key=partition_key, arrow_table_partition=partition_slice)) + return table_partitions + + +def partition(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> Iterable[TablePartition]: Review Comment: It would be good to have a bit more length filenames. I also think we should hide this from the outside user. ```suggestion def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[TablePartition]: ``` I think we can also return a list, so folks know that it is already materialized. ########## tests/conftest.py: ########## @@ -2045,3 +2049,19 @@ def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table": """PyArrow table with all kinds of columns""" return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema) + + +@pytest.fixture(scope="session") +def arrow_table_without_data(pa_schema: "pa.Schema") -> "pa.Table": + import pyarrow as pa + + """PyArrow table with all kinds of columns.""" Review Comment: ```suggestion """PyArrow table with all kinds of columns.""" import pyarrow as pa ``` ########## pyiceberg/table/__init__.py: ########## @@ -1131,8 +1133,11 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) if not isinstance(df, pa.Table): raise ValueError(f"Expected PyArrow table, got: {df}") - if len(self.spec().fields) > 0: - raise ValueError("Cannot write to partitioned tables") + supported = {IdentityTransform} Review Comment: Nit: ```suggestion supported_transforms = {IdentityTransform} ``` ########## pyiceberg/table/__init__.py: ########## @@ -2529,25 +2546,44 @@ def _dataframe_to_data_files( """ from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file - if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0: - raise ValueError("Cannot write to partitioned tables") - counter = itertools.count(0) write_uuid = write_uuid or uuid.uuid4() - target_file_size = PropertyUtil.property_as_int( properties=table_metadata.properties, property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, ) + if target_file_size is None: + raise ValueError( + "Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file." + ) - # This is an iter, so we don't have to materialize everything every time - # This will be more relevant when we start doing partitioned writes - yield from write_file( - io=io, - table_metadata=table_metadata, - tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, target_file_size)]), # type: ignore - ) + if len(table_metadata.spec().fields) > 0: + partitions = partition(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df) + yield from write_file( + io=io, + table_metadata=table_metadata, + tasks=iter([ + WriteTask( + write_uuid=write_uuid, + task_id=next(counter), + record_batches=batches, + partition_key=partition.partition_key, + schema=table_metadata.schema(), + ) + for partition in partitions + for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size) Review Comment: This looks very nice! ########## pyiceberg/table/__init__.py: ########## @@ -2492,16 +2497,28 @@ def _add_and_move_fields( class WriteTask: write_uuid: uuid.UUID task_id: int + schema: Schema record_batches: List[pa.RecordBatch] sort_order_id: Optional[int] = None + partition_key: Optional[PartitionKey] = None - # Later to be extended with partition information + def generate_data_file_partition_path(self) -> str: Review Comment: Nit: This function looks redundant. The check is being done in `generate_data_file_path()` as well. I would merge those two. ########## tests/conftest.py: ########## @@ -2045,3 +2049,19 @@ def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table": """PyArrow table with all kinds of columns""" return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema) + + +@pytest.fixture(scope="session") +def arrow_table_without_data(pa_schema: "pa.Schema") -> "pa.Table": + import pyarrow as pa + + """PyArrow table with all kinds of columns.""" + return pa.Table.from_pylist([], schema=pa_schema) + + +@pytest.fixture(scope="session") +def arrow_table_with_only_nulls(pa_schema: "pa.Schema") -> "pa.Table": + import pyarrow as pa + + """PyArrow table with all kinds of columns.""" Review Comment: ```suggestion """PyArrow table with all kinds of columns.""" import pyarrow as pa ``` ########## pyiceberg/typedef.py: ########## @@ -199,3 +199,7 @@ def __repr__(self) -> str: def record_fields(self) -> List[str]: """Return values of all the fields of the Record class except those specified in skip_fields.""" return [self.__getattribute__(v) if hasattr(self, v) else None for v in self._position_to_field_name] + + def __hash__(self) -> int: + """Return hash value of the Record class.""" + return hash(str(self)) Review Comment: I believe that the str repr is broken currently. It is always `Record[]`. I think the following should work: ```suggestion return hash(self.__dict__.values()) ``` ########## pyiceberg/table/__init__.py: ########## @@ -2526,25 +2537,44 @@ def _dataframe_to_data_files( """ from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file - if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0: - raise ValueError("Cannot write to partitioned tables") - counter = itertools.count(0) write_uuid = write_uuid or uuid.uuid4() - target_file_size = PropertyUtil.property_as_int( properties=table_metadata.properties, property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, ) + if target_file_size is None: + raise ValueError( + "Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file." Review Comment: I like that as well, the `ValueError` is misleading and it is not directly obvious why we would raise it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org