jqin61 opened a new pull request, #582: URL: https://github.com/apache/iceberg-python/pull/582
**Background** When we are doing a static overwrite, we could choose to overwrite the full table or overwrite some partitions of the table. Example spark sql counterpart in [iceberg spark static overwrite](https://iceberg.apache.org/docs/latest/spark-writes/#static-overwrite) for full table overwrite is ``` INSERT OVERWRITE prod.my_app.logs SELECT uuid, first(level), first(ts), first(message) FROM prod.my_app.logs WHERE level = 'INFO' GROUP BY uuid ``` And example spark sql counterpart for specified partition overwrite is ``` INSERT OVERWRITE prod.my_app.logs SELECT uuid, first(level), first(ts), first(message) PARTITION (level = 'INFO') FROM prod.my_app.logs WHERE level = 'INFO' GROUP BY uuid ``` **Goal** When we overwrite the table, we could provide an expression as overwrite_filter in accord with these 2 cases. This pr is to validate that the filter expression should conform to certain rules such as it has to be on a partition column that does not use hidden partitioning and the fields in the filter have to be in accord with the input arrow table in a certain way so that the dataframe does not include values that the filter does not specify. **Rules and Test Cases** 1. **Rule :** The expression could only use **IsNull** or **EqualTo** as building blocks and concatenated by **And** **Tests :** `test__validate_static_overwrite_filter_expr_type` parametrize 1-8 2. **Rule :** The building block predicates (IsNull and EqualTo) should not have conflicting values. **Tests :** `test__validate_static_overwrite_filter_expr_type` parametrize 9-11 3. **Rule :** The terms (fields) should refer to existing fields in the iceberg schema, and also the literal in the predicate (if any) should match the iceberg field type. These mean the expression could be bound with table schema successfully. **Tests :** `test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_schema_fields_in_filter` `test__bind_and_validate_static_overwrite_filter_predicate_fails_to_bind_due_to_incompatible_predicate_value` 4. **Rule :** If expression specifies a field which is required in iceberg schema, it should not be isnull in the expression. **Tests :** `test__bind_and_validate_static_overwrite_filter_predicate_fails_to_bind_due_to_non_nullable` 5. **Rule :** The fields in the expression should be within partition columns **Tests :** `test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_part_fields_in_filter` 6. **Rule :** The iceberg table fields specified in the expression could not have hidden partitioning, however, the non-specified fields could. **Tests :** `test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_identity_transorm_filter` `test__bind_and_validate_static_overwrite_filter_predicate_succeeds_on_an_identity_transform_field_although_table_has_other_hidden_partition_fields` 7. **Rule :** The partition column values in the dataframe should conform to the filter. (To implement in the static overwrite function when partion keys are extracted) **Rule Necessity Justification using Spark Counterparts** To better understand these rules, let us provide spark static overwrite crash counterparts. For which, we have following set up: ``` # Create Spark Dataframe from pyspark.sql.types import StructType, StructField, StringType, LongType data_multicols = [(2, "Flamingo", "red"), (4, "Horse", "white"), (4, "Pig", "pink")] schema = StructType([ StructField("n_legs", LongType(), nullable=True), StructField("animals", StringType(), nullable=True), StructField("color", StringType(), nullable=True) # Mark as non-nullable ]) df_multicols = spark.createDataFrame(data_multicols, schema) # Create Iceberg Table create_sql = """CREATE TABLE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols ( n_legs bigint, animals string, color string) USING iceberg PARTITIONED BY (n_legs, color) """ spark.sql(create_sql) # Insert Initial data df_multicols.createOrReplaceTempView("tmp_view") sql_cmd = f"""INSERT INTO lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols SELECT * FROM tmp_view """ spark.sql(sql_cmd) ``` this gives such table schema: | col_name|data_type|comment| |--------------|---------|-------| | n_legs| bigint| | | animals| string| | | color| string| | | Partitioning| | | |--------------|---------|-------| | Part 0| n_legs| | | Part 1| color| | with such data: |n_legs| animals|color| |------|--------|-----| | 2|Flamingo| red| | 4| Horse|white| | 4| Pig| pink| Now let us check the rules **Rule 1. The expression could only use **IsNull** or **EqualTo** as building blocks and concatenated by **And**.** For example: ``` And(EqualTo(Reference("foo"), "hello"), And(IsNull(Reference("baz")), EqualTo(Reference("boo"), "hello"))) ``` or ```"foo = 'hello' AND (baz IS NULL AND boo = 'hello') ``` Spark counterpart example: ``` sql_cmd = f"""INSERT OVERWRITE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols PARTITION (n_legs > 2) SELECT color,animals FROM tmp_view """ spark.sql(sql_cmd) ``` gives: ``` mismatched input '>' expecting {')', ','}(line 3, pos 22) == SQL == INSERT OVERWRITE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols PARTITION (n_legs > 2) ----------------------^^^ SELECT color,animals FROM tmp_view ``` Other predicates of 'in', '!=', etc and other expression such as 'Or' give similar errors. **Rule 2. The building block predicates (IsNull and EqualTo) should not have conflicting values. ** This means ``` And(EqualTo(Reference("foo"), "hello"), EqualTo(Reference("foo"), "bye")) ``` and ``` And(EqualTo(Reference("foo"), "hello"), IsNull(Reference("foo")) ``` are not allowed. However, ``` And(EqualTo(Reference("foo"), "hello"), EqualTo(Reference("foo"), "hello")) ``` is allowed and shall be deduplicated. Spark counterpart example: ``` sql_cmd = f"""INSERT OVERWRITE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols PARTITION (color='red', color='green') SELECT animals,n_legs FROM tmp_view """ spark.sql(sql_cmd) ``` gives ``` ParseException: Found duplicate keys 'color'.(line 3, pos 4) == SQL == INSERT OVERWRITE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols PARTITION (color='red', color='green') ----^^^ SELECT animals,n_legs FROM tmp_view ``` **Rule 3. The terms (fields) should refer to existing fields in the iceberg schema, and also the literal in the predicate (if any) should match the iceberg field type. These mean the expression could be bound with table schema successfully.** Spark counterpart example: ``` sql_cmd = f"""INSERT OVERWRITE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols PARTITION (not_a_field='red') SELECT animals,n_legs FROM tmp_view """ spark.sql(sql_cmd) ``` gives: ``` AnalysisException: PARTITION clause cannot contain a non-partition column name: not_a_field ``` **Rule 4. If expression specifies a field which is required in iceberg schema, it should not be isnull in the expression.** Spark counterpart example: ``` # Create Spark Dataframe with non-nullable column from pyspark.sql.types import StructType, StructField, StringType, LongType data_multicols = [(2, "Flamingo", "red"), (4, "Horse", "white"), (4, "Pig", "pink")] schema = StructType([ StructField("n_legs", LongType(), nullable=True), StructField("animals", StringType(), nullable=True), StructField("color", StringType(), nullable=False) # Mark as non-nullable ]) df_multicols = spark.createDataFrame(data_multicols, schema) # Create Iceberg Table with non-nullable column create_sql = """CREATE TABLE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols ( n_legs bigint, animals string, color string not NULL) USING iceberg PARTITIONED BY (n_legs, color) """ spark.sql(create_sql) # Insert Initial data df_multicols.createOrReplaceTempView("tmp_view") sql_cmd = f"""INSERT INTO lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols SELECT * FROM tmp_view """ spark.sql(sql_cmd) sql_cmd = f"""INSERT OVERWRITE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols PARTITION (color=null) SELECT animals, n_legs FROM tmp_view """ spark.sql(sql_cmd) ``` gives: ``` AnalysisException: Cannot write incompatible data to table 'lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols': - Cannot safely cast 'n_legs': string to bigint - Cannot write nullable values to non-null column 'color' ``` **Rule 5. The fields in the expression should be within partition columns** Spark counterpart example: ``` sql_cmd = f"""INSERT OVERWRITE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols PARTITION (animals='pig') SELECT n_legs, color FROM tmp_view """ spark.sql(sql_cmd) ``` gives: ``` AnalysisException: PARTITION clause cannot contain a non-partition column name: animals ``` **Rule 6. The iceberg table fields specified in the expression could not have hidden partitioning, however, the non-specified fields could.** Spark counterpart example: ``` create_sql = """CREATE TABLE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms ( n_legs bigint, animals string, color string ) USING iceberg PARTITIONED BY (n_legs, truncate(color, 1)) """ spark.sql(create_sql) sql_cmd = f"""INSERT OVERWRITE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms PARTITION (color='red') SELECT n_legs, animals FROM tmp_view """ spark.sql(sql_cmd) ``` gives: ``` AnalysisException: PARTITION clause cannot contain a non-partition column name: color ``` However, if we specify the other partition column with identity transform, it works: ``` sql_cmd = f"""INSERT OVERWRITE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms PARTITION (n_legs=1) SELECT color, animals FROM tmp_view """ spark.sql(sql_cmd) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org