syun64 commented on code in PR #555:
URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544039629


##########
pyiceberg/table/__init__.py:
##########
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
             snapshots,
             schema=snapshots_schema,
         )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+    partition_key: PartitionKey
+    arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+    order = 'ascending' if not reverse else 'descending'
+    null_placement = 'at_start' if reverse else 'at_end'
+    return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+    iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+    """Given a table sort it by current partition scheme with all transform 
functions supported."""
+    from pyiceberg.transforms import IdentityTransform
+
+    supported = {IdentityTransform}
+    if not all(
+        type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns
+    ):
+        raise ValueError(
+            f"Not all transforms are supported, get: {[transform in supported 
for transform in iceberg_table_metadata.spec().fields]}."
+        )
+
+    # only works for identity
+    sort_options = _get_partition_sort_order(partition_columns, reverse=False)
+    sorted_arrow_table = 
arrow_table.sort_by(sorting=sort_options['sort_keys'], 
null_placement=sort_options['null_placement'])
+    return sorted_arrow_table
+
+
+def get_partition_columns(iceberg_table_metadata: TableMetadata, arrow_table: 
pa.Table) -> list[str]:
+    arrow_table_cols = set(arrow_table.column_names)
+    partition_cols = []
+    for transform_field in iceberg_table_metadata.spec().fields:
+        column_name = 
iceberg_table_metadata.schema().find_column_name(transform_field.source_id)
+        if not column_name:
+            raise ValueError(f"{transform_field=} could not be found in 
{iceberg_table_metadata.schema()}.")
+        if column_name not in arrow_table_cols:
+            continue
+        partition_cols.append(column_name)
+    return partition_cols
+
+
+def _get_table_partitions(
+    arrow_table: pa.Table,
+    partition_spec: PartitionSpec,
+    schema: Schema,
+    slice_instructions: list[dict[str, Any]],
+) -> list[TablePartition]:
+    sorted_slice_instructions = sorted(slice_instructions, key=lambda x: 
x['offset'])
+
+    partition_fields = partition_spec.fields
+
+    offsets = [inst["offset"] for inst in sorted_slice_instructions]
+    projected_and_filtered = {
+        partition_field.source_id: 
arrow_table[schema.find_field(name_or_id=partition_field.source_id).name]
+        .take(offsets)
+        .to_pylist()
+        for partition_field in partition_fields
+    }
+
+    table_partitions = []
+    for inst in sorted_slice_instructions:
+        partition_slice = arrow_table.slice(**inst)
+        fieldvalues = [
+            PartitionFieldValue(partition_field, 
projected_and_filtered[partition_field.source_id][inst["offset"]])
+            for partition_field in partition_fields
+        ]
+        partition_key = PartitionKey(raw_partition_field_values=fieldvalues, 
partition_spec=partition_spec, schema=schema)
+        table_partitions.append(TablePartition(partition_key=partition_key, 
arrow_table_partition=partition_slice))
+
+    return table_partitions
+
+
+def partition(iceberg_table_metadata: TableMetadata, arrow_table: pa.Table) -> 
Iterable[TablePartition]:
+    """Based on the iceberg table partition spec, slice the arrow table into 
partitions with their keys.
+
+    Example:
+    Input:
+    An arrow table with partition key of ['n_legs', 'year'] and with data of
+    {'year': [2020, 2022, 2022, 2021, 2022, 2022, 2022, 2019, 2021],
+     'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100],
+     'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", 
"Horse","Brittle stars", "Centipede"]}.
+    The algrithm:
+    Firstly we group the rows into partitions by sorting with sort order 
[('n_legs', 'descending'), ('year', 'descending')]
+    and null_placement of "at_end".
+    This gives the same table as raw input.
+    Then we sort_indices using reverse order of [('n_legs', 'descending'), 
('year', 'descending')]
+    and null_placement : "at_start".
+    This gives:
+    [8, 7, 4, 5, 6, 3, 1, 2, 0]
+    Based on this we get partition groups of indices:
+    [{'offset': 8, 'length': 1}, {'offset': 7, 'length': 1}, {'offset': 4, 
'length': 3}, {'offset': 3, 'length': 1}, {'offset': 1, 'length': 2}, 
{'offset': 0, 'length': 1}]
+    We then retrieve the partition keys by offsets.
+    And slice the arrow table by offsets and lengths of each partition.
+    """
+    import pyarrow as pa
+
+    partition_columns = get_partition_columns(iceberg_table_metadata, 
arrow_table)

Review Comment:
   How do you feel about this suggestion? Most of this function's 
responsibility seems to lie in making sure that the partition field is provided 
in the arrow_table, but we seem to already be [checking the 
schema](https://github.com/apache/iceberg-python/blob/6aeb12620eaf1caaf9ed5287d1118227bb419886/pyiceberg/table/__init__.py#L1132)
 in the write functions now.
   
   ```suggestion
       partition_columns = 
[iceberg_table_metadata.schema().find_column_name(partition_field.source_id) 
for partition_field in iceberg_table_metadata.spec().fields]
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
             snapshots,
             schema=snapshots_schema,
         )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+    partition_key: PartitionKey
+    arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+    order = 'ascending' if not reverse else 'descending'
+    null_placement = 'at_start' if reverse else 'at_end'
+    return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+    iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+    """Given a table sort it by current partition scheme with all transform 
functions supported."""

Review Comment:
   ```suggestion
       """Given a table, sort it by current partition scheme."""
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
     """
     from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-    if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-        raise ValueError("Cannot write to partitioned tables")
-
     counter = itertools.count(0)
     write_uuid = write_uuid or uuid.uuid4()
-
     target_file_size = PropertyUtil.property_as_int(
         properties=table_metadata.properties,
         property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
         default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
     )
+    if target_file_size is None:
+        raise ValueError(
+            "Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."
+        )
 
-    # This is an iter, so we don't have to materialize everything every time
-    # This will be more relevant when we start doing partitioned writes
-    yield from write_file(
-        io=io,
-        table_metadata=table_metadata,
-        tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches 
in bin_pack_arrow_table(df, target_file_size)]),  # type: ignore
-    )
+    if any(len(spec.fields) > 0 for spec in table_metadata.partition_specs):

Review Comment:
   Great find!



##########
pyiceberg/table/__init__.py:
##########
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
             snapshots,
             schema=snapshots_schema,
         )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+    partition_key: PartitionKey
+    arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+    order = 'ascending' if not reverse else 'descending'
+    null_placement = 'at_start' if reverse else 'at_end'
+    return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+    iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+    """Given a table sort it by current partition scheme with all transform 
functions supported."""
+    from pyiceberg.transforms import IdentityTransform
+
+    supported = {IdentityTransform}
+    if not all(
+        type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns
+    ):
+        raise ValueError(
+            f"Not all transforms are supported, get: {[transform in supported 
for transform in iceberg_table_metadata.spec().fields]}."
+        )
+
+    # only works for identity
+    sort_options = _get_partition_sort_order(partition_columns, reverse=False)
+    sorted_arrow_table = 
arrow_table.sort_by(sorting=sort_options['sort_keys'], 
null_placement=sort_options['null_placement'])
+    return sorted_arrow_table
+
+
+def get_partition_columns(iceberg_table_metadata: TableMetadata, arrow_table: 
pa.Table) -> list[str]:
+    arrow_table_cols = set(arrow_table.column_names)
+    partition_cols = []
+    for transform_field in iceberg_table_metadata.spec().fields:

Review Comment:
   How do you feel about using `partition_field` instead? I feel that that is 
the name we are using in other places (like in the 
[PartitionKey](https://github.com/apache/iceberg-python/blob/6aeb12620eaf1caaf9ed5287d1118227bb419886/pyiceberg/partitioning.py#L378)
 data class)
   
   ```suggestion
       for partition_field in iceberg_table_metadata.spec().fields:
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -2489,16 +2488,28 @@ def _add_and_move_fields(
 class WriteTask:
     write_uuid: uuid.UUID
     task_id: int
+    schema: Schema
     record_batches: List[pa.RecordBatch]
     sort_order_id: Optional[int] = None
+    partition_key: Optional[PartitionKey] = None
 
-    # Later to be extended with partition information
+    def generate_data_file_partition_path(self) -> str:
+        if self.partition_key is None:
+            raise ValueError("Cannot generate partition path based on 
non-partitioned WriteTask")
+        return self.partition_key.to_path()
 
     def generate_data_file_filename(self, extension: str) -> str:
         # Mimics the behavior in the Java API:
         # 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
         return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
 
+    def generate_data_file_path(self, extension: str) -> str:
+        if self.partition_key:
+            file_path = f"{self.generate_data_file_partition_path()}/{self. 
generate_data_file_filename(extension)}"

Review Comment:
   small typo here
   ```suggestion
               file_path = 
f"{self.generate_data_file_partition_path()}/{self.generate_data_file_filename(extension)}"
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
     """
     from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-    if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-        raise ValueError("Cannot write to partitioned tables")
-
     counter = itertools.count(0)
     write_uuid = write_uuid or uuid.uuid4()
-
     target_file_size = PropertyUtil.property_as_int(
         properties=table_metadata.properties,
         property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
         default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
     )
+    if target_file_size is None:
+        raise ValueError(
+            "Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."
+        )
 
-    # This is an iter, so we don't have to materialize everything every time
-    # This will be more relevant when we start doing partitioned writes
-    yield from write_file(
-        io=io,
-        table_metadata=table_metadata,
-        tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches 
in bin_pack_arrow_table(df, target_file_size)]),  # type: ignore
-    )
+    if any(len(spec.fields) > 0 for spec in table_metadata.partition_specs):

Review Comment:
   Do we want to check all the previous partitions as well, or do we only care 
whether the most recent spec is partitioned, when we are writing the new data 
files with the record batches??
   
   ```suggestion
       if len(table_metadata.spec().fields) > 0:
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
             snapshots,
             schema=snapshots_schema,
         )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+    partition_key: PartitionKey
+    arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+    order = 'ascending' if not reverse else 'descending'
+    null_placement = 'at_start' if reverse else 'at_end'
+    return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+    iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+    """Given a table sort it by current partition scheme with all transform 
functions supported."""
+    from pyiceberg.transforms import IdentityTransform
+
+    supported = {IdentityTransform}
+    if not all(
+        type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns

Review Comment:
   This condition looks like it would always succeed because 
`iceberg_table_metadata.spec().fields` is `List[PartitionField]`. So field 
(PartitionField) will never be in partition_columns. `all([]) == True`



##########
pyiceberg/table/__init__.py:
##########
@@ -2526,25 +2537,44 @@ def _dataframe_to_data_files(
     """
     from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
 
-    if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
-        raise ValueError("Cannot write to partitioned tables")
-
     counter = itertools.count(0)
     write_uuid = write_uuid or uuid.uuid4()
-
     target_file_size = PropertyUtil.property_as_int(
         properties=table_metadata.properties,
         property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
         default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
     )
+    if target_file_size is None:
+        raise ValueError(
+            "Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES 
nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."

Review Comment:
   I have mixed feeling about this exception check, because we are setting the 
default value of `target_file_size` as 
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT right in the previous 
line. I feel as though this is too redundant.
   
   I understand why we are doing it though: 
   
   `PropertyUtil.property_as_int` returns `Optional[int]`, and bin_packing 
expects an int, so we need to type check it. 
   
   If we run into more of these type checking redundancies in the code base, 
where when we are using property values that are always expected to have a 
none-null default value, maybe we should refactor `PropertyUtil` instead. Maybe 
we can have two methods, `property_as_int` that returns an `Optional[int]`, and 
`property_as_int_with_default`, that returns an `int`?



##########
pyiceberg/table/__init__.py:
##########
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
             snapshots,
             schema=snapshots_schema,
         )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+    partition_key: PartitionKey
+    arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+    order = 'ascending' if not reverse else 'descending'
+    null_placement = 'at_start' if reverse else 'at_end'
+    return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+    iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+    """Given a table sort it by current partition scheme with all transform 
functions supported."""
+    from pyiceberg.transforms import IdentityTransform
+
+    supported = {IdentityTransform}
+    if not all(
+        type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns
+    ):
+        raise ValueError(
+            f"Not all transforms are supported, get: {[transform in supported 
for transform in iceberg_table_metadata.spec().fields]}."
+        )
+
+    # only works for identity
+    sort_options = _get_partition_sort_order(partition_columns, reverse=False)
+    sorted_arrow_table = 
arrow_table.sort_by(sorting=sort_options['sort_keys'], 
null_placement=sort_options['null_placement'])
+    return sorted_arrow_table
+
+
+def get_partition_columns(iceberg_table_metadata: TableMetadata, arrow_table: 
pa.Table) -> list[str]:
+    arrow_table_cols = set(arrow_table.column_names)
+    partition_cols = []
+    for transform_field in iceberg_table_metadata.spec().fields:
+        column_name = 
iceberg_table_metadata.schema().find_column_name(transform_field.source_id)
+        if not column_name:
+            raise ValueError(f"{transform_field=} could not be found in 
{iceberg_table_metadata.schema()}.")
+        if column_name not in arrow_table_cols:
+            continue
+        partition_cols.append(column_name)
+    return partition_cols
+
+
+def _get_table_partitions(
+    arrow_table: pa.Table,
+    partition_spec: PartitionSpec,
+    schema: Schema,
+    slice_instructions: list[dict[str, Any]],
+) -> list[TablePartition]:
+    sorted_slice_instructions = sorted(slice_instructions, key=lambda x: 
x['offset'])
+
+    partition_fields = partition_spec.fields
+
+    offsets = [inst["offset"] for inst in sorted_slice_instructions]
+    projected_and_filtered = {
+        partition_field.source_id: 
arrow_table[schema.find_field(name_or_id=partition_field.source_id).name]
+        .take(offsets)
+        .to_pylist()
+        for partition_field in partition_fields
+    }
+
+    table_partitions = []
+    for inst in sorted_slice_instructions:
+        partition_slice = arrow_table.slice(**inst)
+        fieldvalues = [
+            PartitionFieldValue(partition_field, 
projected_and_filtered[partition_field.source_id][inst["offset"]])
+            for partition_field in partition_fields
+        ]
+        partition_key = PartitionKey(raw_partition_field_values=fieldvalues, 
partition_spec=partition_spec, schema=schema)
+        table_partitions.append(TablePartition(partition_key=partition_key, 
arrow_table_partition=partition_slice))
+
+    return table_partitions
+
+
+def partition(iceberg_table_metadata: TableMetadata, arrow_table: pa.Table) -> 
Iterable[TablePartition]:

Review Comment:
   It looks like we only need the `PartitionSpec`, and the iceberg table 
`Schema` in this operation. How do you feel about updating the input arguments, 
and leaving it to the caller to pass in the correct `partition_spec` and the 
`schema` into this function?
   
   I think that that will also allow us to unit test this complicated function 
more easily, so that we can clearly define its behavior when we eventually 
extend it to more complex partitions with Transforms. Defining the 
PartitionSpec and Schema is fairly simple, where as defining a TableMetadata is 
a a lot more involved
   
   ```suggestion
   def partition(partition_spec: PartitionSpec, iceberg_table_schema: Schema, 
arrow_table: pa.Table) -> Iterable[TablePartition]:
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table":
             snapshots,
             schema=snapshots_schema,
         )
+
+
+@dataclass(frozen=True)
+class TablePartition:
+    partition_key: PartitionKey
+    arrow_table_partition: pa.Table
+
+
+def _get_partition_sort_order(partition_columns: list[str], reverse: bool = 
False) -> dict[str, Any]:
+    order = 'ascending' if not reverse else 'descending'
+    null_placement = 'at_start' if reverse else 'at_end'
+    return {'sort_keys': [(column_name, order) for column_name in 
partition_columns], 'null_placement': null_placement}
+
+
+def group_by_partition_scheme(
+    iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, 
partition_columns: list[str]
+) -> pa.Table:
+    """Given a table sort it by current partition scheme with all transform 
functions supported."""
+    from pyiceberg.transforms import IdentityTransform
+
+    supported = {IdentityTransform}
+    if not all(
+        type(field.transform) in supported for field in 
iceberg_table_metadata.spec().fields if field in partition_columns
+    ):
+        raise ValueError(
+            f"Not all transforms are supported, get: {[transform in supported 
for transform in iceberg_table_metadata.spec().fields]}."

Review Comment:
   This error message may need some reviewing too, for similar reasons as above 
:)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to