jqin61 opened a new pull request, #582:
URL: https://github.com/apache/iceberg-python/pull/582

   **Background** 
   When we are doing a static overwrite, we could choose to overwrite the full 
table or overwrite some partitions of the table. 
   
   Example spark sql counterpart in [iceberg spark static 
overwrite](https://iceberg.apache.org/docs/latest/spark-writes/#static-overwrite)
 for full table overwrite is 
   ```
   INSERT OVERWRITE prod.my_app.logs
   SELECT uuid, first(level), first(ts), first(message)
   FROM prod.my_app.logs
   WHERE level = 'INFO'
   GROUP BY uuid
   ```
   And example spark sql counterpart  for specified partition overwrite  is
   ```
   INSERT OVERWRITE prod.my_app.logs
   SELECT uuid, first(level), first(ts), first(message)
   PARTITION (level = 'INFO')
   FROM prod.my_app.logs
   WHERE level = 'INFO'
   GROUP BY uuid
   ```
   **Goal**
   When we overwrite the table, we could provide an expression as 
overwrite_filter in accord with these 2 cases. This pr is to validate that the 
filter expression should conform to certain rules such as it has to be on a 
partition column that does not use hidden partitioning and the fields in the 
filter have to be in accord with the input arrow table in a certain way so that 
the dataframe does not include values that the filter does not specify. 
   
   **Rules and Test Cases**
   1. **Rule :** The expression could only use **IsNull** or **EqualTo** as 
building blocks and concatenated by **And**
        **Tests :** `test__validate_static_overwrite_filter_expr_type` 
parametrize 1-8
   
   2. **Rule :** The building block predicates (IsNull and EqualTo) should not 
have conflicting values.
        **Tests :** `test__validate_static_overwrite_filter_expr_type` 
parametrize 9-11
   
   3. **Rule :** The terms (fields) should refer to existing fields in the 
iceberg schema, and also the literal in the predicate (if any) should match the 
iceberg field type. These mean the expression could be bound with table schema 
successfully.
        **Tests :** 
   
`test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_schema_fields_in_filter`
   
`test__bind_and_validate_static_overwrite_filter_predicate_fails_to_bind_due_to_incompatible_predicate_value`
   
   4. **Rule :** If expression specifies a field which is required in iceberg 
schema, it should not be isnull in the expression.
        **Tests :** 
`test__bind_and_validate_static_overwrite_filter_predicate_fails_to_bind_due_to_non_nullable`
   
   5. **Rule :** The fields in the expression should be within partition columns
        **Tests :** 
`test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_part_fields_in_filter`
   
   6. **Rule :** The iceberg table fields specified in the expression could not 
have hidden partitioning, however, the non-specified fields could.
        **Tests :** 
   
`test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_identity_transorm_filter`
  
   
`test__bind_and_validate_static_overwrite_filter_predicate_succeeds_on_an_identity_transform_field_although_table_has_other_hidden_partition_fields`
   
   7. **Rule :** The partition column values in the dataframe should conform to 
the filter. (To implement in the static overwrite function when partion keys 
are extracted)
   
   **Rule Necessity Justification using Spark Counterparts**
   To better understand these rules, let us provide spark static overwrite 
crash counterparts. For which, we have following set up:
   ```
   # Create Spark Dataframe
   from pyspark.sql.types import StructType, StructField, StringType, LongType
   data_multicols = [(2, "Flamingo", "red"), (4, "Horse", "white"), (4, "Pig", 
"pink")]
   schema = StructType([
       StructField("n_legs", LongType(), nullable=True),
       StructField("animals", StringType(), nullable=True),
       StructField("color", StringType(), nullable=True)  # Mark as non-nullable
   ])
   df_multicols = spark.createDataFrame(data_multicols, schema)
   
   # Create Iceberg Table
   create_sql = """CREATE TABLE 
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols ( 
       n_legs bigint, 
       animals string,
       color string) 
   USING iceberg
   PARTITIONED BY (n_legs, color)
   
   """
   spark.sql(create_sql)
   
   # Insert Initial data
   df_multicols.createOrReplaceTempView("tmp_view")
   sql_cmd = f"""INSERT INTO
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
       SELECT * FROM  tmp_view
       """
   spark.sql(sql_cmd)
   ```
   
   this gives such table schema:
   
   |      col_name|data_type|comment|
   |--------------|---------|-------|
   |        n_legs|   bigint|       |
   |       animals|   string|       |
   |         color|   string|       |
   
   | Partitioning|         |       |
   |--------------|---------|-------|
   |        Part 0|   n_legs|       |
   |        Part 1|    color|       |
   
   with such data:
   |n_legs| animals|color|
   |------|--------|-----|
   |     2|Flamingo|  red|
   |     4|   Horse|white|
   |     4|     Pig| pink|
   
   
   Now let us check the rules
   **Rule 1. The expression could only use **IsNull** or **EqualTo** as 
building blocks and concatenated by **And**.**
   For example:
   ```
   And(EqualTo(Reference("foo"), "hello"), And(IsNull(Reference("baz")), 
EqualTo(Reference("boo"), "hello")))
   ```
   or
   ```"foo = 'hello' AND (baz IS NULL AND boo = 'hello') ```
   
   Spark counterpart example:
   ```
   sql_cmd = f"""INSERT OVERWRITE
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
       PARTITION (n_legs > 2)
       SELECT color,animals FROM  tmp_view
       """
   spark.sql(sql_cmd)
   ```
   gives:
   ```
   mismatched input '>' expecting {')', ','}(line 3, pos 22)
   
   == SQL ==
   INSERT OVERWRITE
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
       PARTITION (n_legs > 2)
   ----------------------^^^
       SELECT color,animals FROM  tmp_view
   ```
   Other predicates of 'in', '!=', etc and other expression such as 'Or' give 
similar errors.
   
   **Rule 2. The building block predicates (IsNull and EqualTo) should not have 
conflicting values. **
   This means 
   ```
   And(EqualTo(Reference("foo"), "hello"), EqualTo(Reference("foo"), "bye"))
   ```
   and
   ```
   And(EqualTo(Reference("foo"), "hello"), IsNull(Reference("foo"))
   ```
   are not allowed.
   
   However,
   ```
   And(EqualTo(Reference("foo"), "hello"), EqualTo(Reference("foo"), "hello"))
   ```
   is allowed and shall be deduplicated.
   
   Spark counterpart example:
   ```
   sql_cmd = f"""INSERT OVERWRITE
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
       PARTITION (color='red', color='green')
       SELECT animals,n_legs FROM  tmp_view
       """
   spark.sql(sql_cmd)
   ```
   gives
   ```
   ParseException: 
   Found duplicate keys 'color'.(line 3, pos 4)
   
   == SQL ==
   INSERT OVERWRITE
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
       PARTITION (color='red', color='green')
   ----^^^
       SELECT animals,n_legs FROM  tmp_view
   ```
   
   **Rule 3. The terms (fields) should refer to existing fields in the iceberg 
schema, and also the literal in the predicate (if any) should match the iceberg 
field type. These mean the expression could be bound with table schema 
successfully.**
   
   Spark counterpart example:
   ```
   sql_cmd = f"""INSERT OVERWRITE
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
       PARTITION (not_a_field='red')
       SELECT animals,n_legs FROM  tmp_view
       """
   spark.sql(sql_cmd)
   ```
   gives:
   ```
   AnalysisException: PARTITION clause cannot contain a non-partition column 
name: not_a_field
   ```
   **Rule 4. If expression specifies a field which is required in iceberg 
schema, it should not be isnull in the expression.**
   
   Spark counterpart example:
   ```
   # Create Spark Dataframe with non-nullable column
   from pyspark.sql.types import StructType, StructField, StringType, LongType
   data_multicols = [(2, "Flamingo", "red"), (4, "Horse", "white"), (4, "Pig", 
"pink")]
   schema = StructType([
       StructField("n_legs", LongType(), nullable=True),
       StructField("animals", StringType(), nullable=True),
       StructField("color", StringType(), nullable=False)  # Mark as 
non-nullable
   ])
   df_multicols = spark.createDataFrame(data_multicols, schema)
   
   # Create Iceberg Table with non-nullable column
   create_sql = """CREATE TABLE 
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols ( 
       n_legs bigint, 
       animals string,
       color string not NULL) 
   USING iceberg
   PARTITIONED BY (n_legs, color)
   
   """
   spark.sql(create_sql)
   
   # Insert Initial data
   df_multicols.createOrReplaceTempView("tmp_view")
   sql_cmd = f"""INSERT INTO
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
       SELECT * FROM  tmp_view
       """
   spark.sql(sql_cmd)
   
   sql_cmd = f"""INSERT OVERWRITE
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
       PARTITION (color=null)
       SELECT animals, n_legs FROM  tmp_view
       """
   spark.sql(sql_cmd)
   ```
   gives:
   ```
   AnalysisException: Cannot write incompatible data to table 
'lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols':
   - Cannot safely cast 'n_legs': string to bigint
   - Cannot write nullable values to non-null column 'color'
   ```
   
   **Rule 5. The fields in the expression should be within partition columns**
   Spark counterpart example:
   ```
   sql_cmd = f"""INSERT OVERWRITE
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
       PARTITION (animals='pig')
       SELECT n_legs, color FROM  tmp_view
       """
   spark.sql(sql_cmd)
   ```
   gives:
   ```
   AnalysisException: PARTITION clause cannot contain a non-partition column 
name: animals
   ```
   **Rule 6. The iceberg table fields specified in the expression could not 
have hidden partitioning, however, the non-specified fields could.**
   
   Spark counterpart example:
   ```
   create_sql = """CREATE TABLE 
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms
 ( 
       n_legs bigint, 
       animals string,
       color string
   ) 
   USING iceberg
   PARTITIONED BY (n_legs, truncate(color, 1))
   """
   spark.sql(create_sql)
   
   sql_cmd = f"""INSERT OVERWRITE
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms
       PARTITION (color='red')
       SELECT n_legs, animals FROM  tmp_view
       """
   spark.sql(sql_cmd)
   
   ```
   gives:
   ```
   AnalysisException: PARTITION clause cannot contain a non-partition column 
name: color
   ```
   
   However, if we specify the other partition column with identity transform, 
it works:
   ```
   sql_cmd = f"""INSERT OVERWRITE
       
lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms
       PARTITION (n_legs=1)
       SELECT color, animals FROM  tmp_view
       """
   spark.sql(sql_cmd)
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to