dramaticlly commented on code in PR #6714: URL: https://github.com/apache/iceberg/pull/6714#discussion_r1093848467
########## python/pyiceberg/expressions/literals.py: ########## @@ -328,6 +328,9 @@ def __le__(self, other: Any) -> bool: def __ge__(self, other: Any) -> bool: return self._value32 >= other + def __hash__(self) -> int: + return hash(self.value) Review Comment: curious, why not use `self._value32` here ########## python/pyiceberg/conversions.py: ########## @@ -241,7 +242,7 @@ def _(primitive_type: DecimalType, value: Decimal) -> bytes: @singledispatch -def from_bytes(primitive_type: PrimitiveType, b: bytes) -> Union[bool, bytes, Decimal, float, int, str, uuid.UUID]: +def from_bytes(primitive_type: PrimitiveType, b: bytes) -> L: Review Comment: lovely ########## python/pyiceberg/expressions/visitors.py: ########## @@ -986,3 +989,246 @@ def expression_to_plain_format( # In the form of expr1 ∨ expr2 ∨ ... ∨ exprN visitor = ExpressionToPlainFormat(cast_int_to_datetime) return [visit(expression, visitor) for expression in expressions] + + +class _InclusiveMetricsEvaluator(BoundBooleanExpressionVisitor[bool]): + + struct: StructType + expr: BooleanExpression + + value_counts: Dict[int, int] + null_counts: Dict[int, int] + nan_counts: Dict[int, int] + lower_bounds: Dict[int, bytes] + upper_bounds: Dict[int, bytes] + + def __init__(self, schema: Schema, expr: BooleanExpression, case_sensitive: bool = True) -> None: + self.struct = schema.as_struct() + self.expr = bind(schema, rewrite_not(expr), case_sensitive) + + def eval(self, file: DataFile) -> bool: + """Test whether the file may contain records that match the expression.""" + + if file.record_count == 0: + return ROWS_CANNOT_MATCH + + if file.record_count < 0: + # we haven't implemented parsing record count from avro file and thus set record count -1 Review Comment: nit: probably warrant a TODO? ########## python/tests/expressions/test_visitors.py: ########## @@ -1472,3 +1478,211 @@ def test_dnf_to_dask(table_schema_simple: Schema) -> None: ), ) assert expression_to_plain_format(expr) == [[("foo", ">", "hello")], [("bar", "in", {1, 2, 3}), ("baz", "==", True)]] + + +@pytest.fixture +def schema_data_file() -> Schema: + return Schema( + NestedField(1, "all_nan", DoubleType(), required=True), + NestedField(2, "max_nan", DoubleType(), required=True), + NestedField(3, "min_max_nan", FloatType(), required=False), + NestedField(4, "all_nan_null_bounds", DoubleType(), required=True), + NestedField(5, "some_nan_correct_bounds", FloatType(), required=False), + ) + + +@pytest.fixture +def data_file() -> DataFile: + return DataFile( + file_path="file.avro", + file_format=FileFormat.PARQUET, + partition={}, + record_count=50, + file_size_in_bytes=3, + column_sizes={ + 1: 10, + 2: 10, + 3: 10, + 4: 10, + 5: 10, + }, + value_counts={ + 1: 10, + 2: 10, + 3: 10, + 4: 10, + 5: 10, + }, + null_value_counts={ + 1: 0, + 2: 0, + 3: 0, + 4: 0, + 5: 0, + }, + nan_value_counts={1: 10, 4: 10, 5: 5}, + lower_bounds={ + 1: to_bytes(DoubleType(), float("nan")), + 2: to_bytes(DoubleType(), 7), + 3: to_bytes(FloatType(), float("nan")), + 5: to_bytes(FloatType(), 7), + }, + upper_bounds={ + 1: to_bytes(DoubleType(), float("nan")), + 2: to_bytes(DoubleType(), float("nan")), + 3: to_bytes(FloatType(), float("nan")), + 5: to_bytes(FloatType(), 22), + }, + ) + + +def test_inclusive_metrics_evaluator_less_than_and_less_than_equal(schema_data_file: Schema, data_file: DataFile) -> None: + for operator in [LessThan, LessThanOrEqual]: + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("all_nan", 1)).eval(data_file) + assert not should_read, "Should not match: all nan column doesn't contain number" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("max_nan", 1)).eval(data_file) + assert not should_read, "Should not match: 1 is smaller than lower bound" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("max_nan", 10)).eval(data_file) + assert should_read, "Should match: 10 is larger than lower bound" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("min_max_nan", 1)).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("all_nan_null_bounds", 1)).eval(data_file) + assert not should_read, "Should not match: all nan column doesn't contain number" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("some_nan_correct_bounds", 1)).eval(data_file) + assert not should_read, "Should not match: 1 is smaller than lower bound" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("some_nan_correct_bounds", 10)).eval(data_file) + assert should_read, "Should match: 10 larger than lower bound" + + +def test_inclusive_metrics_evaluator_greater_than_and_greater_than_equal(schema_data_file: Schema, data_file: DataFile) -> None: + for operator in [GreaterThan, GreaterThanOrEqual]: + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("all_nan", 1)).eval(data_file) + assert not should_read, "Should not match: all nan column doesn't contain number" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("max_nan", 1)).eval(data_file) + assert should_read, "Should match: upper bound is larger than 1" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("max_nan", 10)).eval(data_file) + assert should_read, "Should match: upper bound is larger than 10" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("min_max_nan", 1)).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("all_nan_null_bounds", 1)).eval(data_file) + assert not should_read, "Should not match: all nan column doesn't contain number" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("some_nan_correct_bounds", 1)).eval(data_file) + assert should_read, "Should match: 1 is smaller than upper bound" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("some_nan_correct_bounds", 10)).eval(data_file) + assert should_read, "Should match: 10 is smaller than upper bound" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, operator("all_nan", 30)).eval(data_file) + assert not should_read, "Should not match: 30 is greater than upper bound" + + +def test_inclusive_metrics_evaluator_equals(schema_data_file: Schema, data_file: DataFile) -> None: + should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("all_nan", 1)).eval(data_file) + assert not should_read, "Should not match: all nan column doesn't contain number" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("max_nan", 1)).eval(data_file) + assert not should_read, "Should not match: 1 is smaller than lower bound" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("max_nan", 10)).eval(data_file) + assert should_read, "Should match: 10 is within bounds" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("min_max_nan", 1)).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("all_nan_null_bounds", 1)).eval(data_file) + assert not should_read, "Should not match: all nan column doesn't contain number" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("some_nan_correct_bounds", 1)).eval(data_file) + assert not should_read, "Should not match: 1 is smaller than lower bound" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("some_nan_correct_bounds", 10)).eval(data_file) + assert should_read, "Should match: 10 is within bounds" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("all_nan", 30)).eval(data_file) + assert not should_read, "Should not match: 30 is greater than upper bound" + + +def test_inclusive_metrics_evaluator_not_equals(schema_data_file: Schema, data_file: DataFile) -> None: + should_read = _InclusiveMetricsEvaluator(schema_data_file, NotEqualTo("all_nan", 1)).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, NotEqualTo("max_nan", 10)).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, NotEqualTo("max_nan", 10)).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, NotEqualTo("min_max_nan", 1)).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, NotEqualTo("all_nan_null_bounds", 1)).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, NotEqualTo("some_nan_correct_bounds", 1)).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, NotEqualTo("some_nan_correct_bounds", 10)).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, NotEqualTo("some_nan_correct_bounds", 30)).eval(data_file) + assert should_read, "Should match: no visibility" + + +def test_inclusive_metrics_evaluator_in(schema_data_file: Schema, data_file: DataFile) -> None: + should_read = _InclusiveMetricsEvaluator(schema_data_file, In("all_nan", (1, 10, 30))).eval(data_file) + assert not should_read, "Should not match: all nan column doesn't contain number" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, In("max_nan", (1, 10, 30))).eval(data_file) + assert should_read, "Should match: 10 and 30 are greater than lower bound" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, In("min_max_nan", (1, 10, 30))).eval(data_file) + assert should_read, "Should match: no visibility" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, In("all_nan_null_bounds", (1, 10, 30))).eval(data_file) + assert not should_read, "Should not match: all nan column doesn't contain number" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, In("some_nan_correct_bounds", (1, 10, 30))).eval(data_file) + assert should_read, "Should match: 10 within bounds" + + should_read = _InclusiveMetricsEvaluator(schema_data_file, In("some_nan_correct_bounds", (1, 30))).eval(data_file) + assert not should_read, "Should not match: 1 not within bounds" Review Comment: nit: Should not match, 1 and 30 are both are out of bounds ########## python/pyiceberg/expressions/visitors.py: ########## @@ -986,3 +989,246 @@ def expression_to_plain_format( # In the form of expr1 ∨ expr2 ∨ ... ∨ exprN visitor = ExpressionToPlainFormat(cast_int_to_datetime) return [visit(expression, visitor) for expression in expressions] + + +class _InclusiveMetricsEvaluator(BoundBooleanExpressionVisitor[bool]): + + struct: StructType + expr: BooleanExpression + + value_counts: Dict[int, int] + null_counts: Dict[int, int] + nan_counts: Dict[int, int] + lower_bounds: Dict[int, bytes] + upper_bounds: Dict[int, bytes] + + def __init__(self, schema: Schema, expr: BooleanExpression, case_sensitive: bool = True) -> None: + self.struct = schema.as_struct() + self.expr = bind(schema, rewrite_not(expr), case_sensitive) + + def eval(self, file: DataFile) -> bool: + """Test whether the file may contain records that match the expression.""" + + if file.record_count == 0: + return ROWS_CANNOT_MATCH + + if file.record_count < 0: + # we haven't implemented parsing record count from avro file and thus set record count -1 + # when importing avro tables to iceberg tables. This should be updated once we implemented + # and set correct record count. + return ROWS_MIGHT_MATCH + + self.value_counts = file.value_counts or EMPTY_DICT + self.null_counts = file.null_value_counts or EMPTY_DICT + self.nan_counts = file.nan_value_counts or EMPTY_DICT + self.lower_bounds = file.lower_bounds or EMPTY_DICT + self.upper_bounds = file.upper_bounds or EMPTY_DICT + + return visit(self.expr, self) + + def _contains_nulls_only(self, idx: int) -> bool: + return idx in self.value_counts and idx in self.null_counts and self.value_counts[idx] - self.null_counts[idx] == 0 Review Comment: I guess column index for value and null count always present even when count = 0 , but not necessarily to have idx=0 for nan count. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org