sungwy commented on code in PR #931: URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1766801854
########## mkdocs/docs/api.md: ########## @@ -353,6 +353,127 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]] long: [[4.896029,-122.431297,6.0989],[6.56667]] ``` +### Partial overwrites + +You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. Review Comment: ```suggestion When using the `overwrite` API, you can use an `overwrite_filter` to delete data that that matches the filter before appending new data into the table. ``` ########## mkdocs/docs/api.md: ########## @@ -353,6 +353,127 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]] long: [[4.896029,-122.431297,6.0989],[6.56667]] ``` +### Partial overwrites + +You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. + +For example, with an iceberg table created as: + +```python +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("default") + +from pyiceberg.schema import Schema +from pyiceberg.types import NestedField, StringType, DoubleType + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_table("default.cities", schema=schema) +``` + +And with initial data populating the table: + +```python +import pyarrow as pa +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": 48.864716, "long": 2.349014}, + ], +) +tbl.append(df) +``` + +You can overwrite the record of `Paris` with a record of `New York`: + +```python +from pyiceberg.expressions import EqualTo +df = pa.Table.from_pylist( + [ + {"city": "New York", "lat": 40.7128, "long": 74.0060}, + ] +) +tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris")) +``` + +This results in such data if data is printed by `tbl.scan().to_arrow()`: + +```python +pyarrow.Table +city: large_string +lat: double +long: double +---- +city: [["New York"],["Amsterdam","Drachten","Paris"]] +lat: [[40.7128],[52.371807,53.11254,48.864716]] +long: [[74.006],[4.896029,6.0989,2.349014]] +``` + +If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. Review Comment: ```suggestion If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the existing partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically from the provided arrow table. ``` ########## mkdocs/docs/api.md: ########## @@ -353,6 +353,127 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]] long: [[4.896029,-122.431297,6.0989],[6.56667]] ``` +### Partial overwrites + +You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. + +For example, with an iceberg table created as: + +```python +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("default") + +from pyiceberg.schema import Schema +from pyiceberg.types import NestedField, StringType, DoubleType + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_table("default.cities", schema=schema) +``` + +And with initial data populating the table: + +```python +import pyarrow as pa +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": 48.864716, "long": 2.349014}, + ], +) +tbl.append(df) +``` + +You can overwrite the record of `Paris` with a record of `New York`: + +```python +from pyiceberg.expressions import EqualTo +df = pa.Table.from_pylist( + [ + {"city": "New York", "lat": 40.7128, "long": 74.0060}, + ] +) +tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris")) +``` + +This results in such data if data is printed by `tbl.scan().to_arrow()`: + +```python +pyarrow.Table +city: large_string +lat: double +long: double +---- +city: [["New York"],["Amsterdam","Drachten","Paris"]] +lat: [[40.7128],[52.371807,53.11254,48.864716]] +long: [[74.006],[4.896029,6.0989,2.349014]] +``` + +If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. +To try out it, you could firstly create a same PyIceberg table with partition specified on `"city"` field: + +```python +from pyiceberg.schema import Schema +from pyiceberg.types import DoubleType, NestedField, StringType + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_table( + "default.cities", + schema=schema, + partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="city_identity")) +) +``` + +And then suppose the data for the partition of `"paris"` is wrong: Review Comment: ```suggestion And we want to overwrite the data for the partition of `"Paris"`: ``` ########## mkdocs/docs/api.md: ########## @@ -353,6 +353,127 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]] long: [[4.896029,-122.431297,6.0989],[6.56667]] ``` +### Partial overwrites + +You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. + +For example, with an iceberg table created as: + +```python +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("default") + +from pyiceberg.schema import Schema +from pyiceberg.types import NestedField, StringType, DoubleType + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_table("default.cities", schema=schema) +``` + +And with initial data populating the table: + +```python +import pyarrow as pa +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": 48.864716, "long": 2.349014}, + ], +) +tbl.append(df) +``` + +You can overwrite the record of `Paris` with a record of `New York`: + +```python +from pyiceberg.expressions import EqualTo +df = pa.Table.from_pylist( + [ + {"city": "New York", "lat": 40.7128, "long": 74.0060}, + ] +) +tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris")) +``` + +This results in such data if data is printed by `tbl.scan().to_arrow()`: + +```python +pyarrow.Table +city: large_string +lat: double +long: double +---- +city: [["New York"],["Amsterdam","Drachten","Paris"]] +lat: [[40.7128],[52.371807,53.11254,48.864716]] +long: [[74.006],[4.896029,6.0989,2.349014]] +``` + +If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. +To try out it, you could firstly create a same PyIceberg table with partition specified on `"city"` field: + +```python +from pyiceberg.schema import Schema +from pyiceberg.types import DoubleType, NestedField, StringType + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_table( + "default.cities", + schema=schema, + partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="city_identity")) +) +``` + +And then suppose the data for the partition of `"paris"` is wrong: + +```python +import pyarrow as pa + +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": -48.864716, "long": -2.349014}, + ], +) +tbl.append(df) +``` + +Then you could use dynamic overwrite on this partition: + +```python +df_corrected = pa.Table.from_pylist([ + {"city": "Paris", "lat": 48.864716, "long": 2.349014} +]) +tbl.dynamic_partition_overwrite(df_corrected) +``` + +This results in such data if data is printed by `tbl.scan().to_arrow()`: Review Comment: ```suggestion This produces the following result with `tbl.scan().to_arrow()`: ``` ########## mkdocs/docs/api.md: ########## @@ -353,6 +353,127 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]] long: [[4.896029,-122.431297,6.0989],[6.56667]] ``` +### Partial overwrites + +You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. + +For example, with an iceberg table created as: + +```python +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("default") + +from pyiceberg.schema import Schema +from pyiceberg.types import NestedField, StringType, DoubleType + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_table("default.cities", schema=schema) +``` + +And with initial data populating the table: + +```python +import pyarrow as pa +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": 48.864716, "long": 2.349014}, + ], +) +tbl.append(df) +``` + +You can overwrite the record of `Paris` with a record of `New York`: + +```python +from pyiceberg.expressions import EqualTo +df = pa.Table.from_pylist( + [ + {"city": "New York", "lat": 40.7128, "long": 74.0060}, + ] +) +tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris")) +``` + +This results in such data if data is printed by `tbl.scan().to_arrow()`: + +```python +pyarrow.Table +city: large_string +lat: double +long: double +---- +city: [["New York"],["Amsterdam","Drachten","Paris"]] +lat: [[40.7128],[52.371807,53.11254,48.864716]] +long: [[74.006],[4.896029,6.0989,2.349014]] +``` + +If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. +To try out it, you could firstly create a same PyIceberg table with partition specified on `"city"` field: + +```python +from pyiceberg.schema import Schema +from pyiceberg.types import DoubleType, NestedField, StringType + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_table( + "default.cities", + schema=schema, + partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="city_identity")) +) +``` + +And then suppose the data for the partition of `"paris"` is wrong: + +```python +import pyarrow as pa + +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": -48.864716, "long": -2.349014}, + ], +) +tbl.append(df) +``` + +Then you could use dynamic overwrite on this partition: Review Comment: ```suggestion Then we can call `dynamic_partition_overwrite` with this arrow table: ``` ########## mkdocs/docs/api.md: ########## @@ -353,6 +353,127 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]] long: [[4.896029,-122.431297,6.0989],[6.56667]] ``` +### Partial overwrites + +You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. + +For example, with an iceberg table created as: + +```python +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("default") + +from pyiceberg.schema import Schema +from pyiceberg.types import NestedField, StringType, DoubleType + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_table("default.cities", schema=schema) +``` + +And with initial data populating the table: + +```python +import pyarrow as pa +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": 48.864716, "long": 2.349014}, + ], +) +tbl.append(df) +``` + +You can overwrite the record of `Paris` with a record of `New York`: + +```python +from pyiceberg.expressions import EqualTo +df = pa.Table.from_pylist( + [ + {"city": "New York", "lat": 40.7128, "long": 74.0060}, + ] +) +tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris")) +``` + +This results in such data if data is printed by `tbl.scan().to_arrow()`: Review Comment: ```suggestion This produces the following result with `tbl.scan().to_arrow()`: ``` ########## pyiceberg/table/__init__.py: ########## @@ -456,6 +461,89 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) for data_file in data_files: append_files.append_data_file(data_file) + def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanExpression: + partition_spec = self.table_metadata.spec() + schema = self.table_metadata.schema() + partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields] + + expr: BooleanExpression = AlwaysFalse() + for partition_record in partition_records: + match_partition_expression: BooleanExpression = AlwaysTrue() + + for pos, partition_field in enumerate(partition_fields): + predicate = ( + EqualTo(Reference(partition_field), partition_record[pos]) + if partition_record[pos] is not None + else IsNull(Reference(partition_field)) + ) + match_partition_expression = And(match_partition_expression, predicate) + expr = Or(expr, match_partition_expression) + return expr + + def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand for overwriting existing partitions with a PyArrow table. + + The function detects partition values in the provided arrow table that using the current table + partition spec, and deletes existing partitions matching these values. Finally, the + data in the table is appended to the table. Review Comment: ```suggestion The function detects partition values in the provided arrow table using the current partition spec, and deletes existing partitions matching these values. Finally, the data in the table is appended to the table. ``` ########## mkdocs/docs/api.md: ########## @@ -353,6 +353,127 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]] long: [[4.896029,-122.431297,6.0989],[6.56667]] ``` +### Partial overwrites + +You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. + +For example, with an iceberg table created as: + +```python +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("default") + +from pyiceberg.schema import Schema +from pyiceberg.types import NestedField, StringType, DoubleType + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_table("default.cities", schema=schema) +``` + +And with initial data populating the table: + +```python +import pyarrow as pa +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": 48.864716, "long": 2.349014}, + ], +) +tbl.append(df) +``` + +You can overwrite the record of `Paris` with a record of `New York`: + +```python +from pyiceberg.expressions import EqualTo +df = pa.Table.from_pylist( + [ + {"city": "New York", "lat": 40.7128, "long": 74.0060}, + ] +) +tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris")) +``` + +This results in such data if data is printed by `tbl.scan().to_arrow()`: + +```python +pyarrow.Table +city: large_string +lat: double +long: double +---- +city: [["New York"],["Amsterdam","Drachten","Paris"]] +lat: [[40.7128],[52.371807,53.11254,48.864716]] +long: [[74.006],[4.896029,6.0989,2.349014]] +``` + +If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. +To try out it, you could firstly create a same PyIceberg table with partition specified on `"city"` field: Review Comment: ```suggestion For example, with an iceberg table with a partition specified on `"city"` field: ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org