wuchong commented on code in PR #2502:
URL: https://github.com/apache/fluss/pull/2502#discussion_r2912642867


##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -1230,4 +1230,91 @@ message PbTableStatsRespForBucket {
   // Per-column statistics, keyed by column index.
   // Only populated when target_columns is specified in the request.
   // repeated PbColumnStats column_stats = 10;
+}
+
+/**
+* Definition of predicate system related message
+*/
+
+// PbPredicateType constants (using int32 instead of enum for proto3 
compatibility)
+// LEAF = 0, COMPOUND = 1
+
+// Represents a predicate that can be serialized and transmitted across
+// languages
+message PbPredicate {
+  // See PbPredicateType constants: LEAF = 0, COMPOUND = 1
+  required int32 type = 1;
+  optional PbLeafPredicate leaf = 2;
+  optional PbCompoundPredicate compound = 3;
+}
+
+// Represents a leaf predicate that compares a field with literals
+message PbLeafPredicate {
+  // The function to apply (see PbLeafFunction constants: EQUAL=0 ... 
NOT_IN=12)
+  required int32 function = 1;
+  // The field reference
+  required PbFieldRef field_ref = 2;
+  // The literals to compare with
+  repeated PbLiteralValue literals = 3;
+}
+
+// Represents a field reference
+message PbFieldRef {
+  // The name of the field
+  required string field_name = 1;
+  // The data type of the field
+  required PbDataType data_type = 2;
+  // Optional field index in the row
+  required int32 field_index = 3;
+}
+
+// Represents a compound predicate that combines multiple predicates
+message PbCompoundPredicate {
+  // The function to apply (see PbCompoundFunction constants: AND=0, OR=1)
+  required int32 function = 1;
+  // The child predicates
+  repeated PbPredicate children = 2;
+}
+
+// PbLeafFunction constants (using int32 instead of enum for proto3 
compatibility)
+// EQUAL=0, NOT_EQUAL=1, LESS_THAN=2, LESS_OR_EQUAL=3, GREATER_THAN=4,
+// GREATER_OR_EQUAL=5, IS_NULL=6, IS_NOT_NULL=7, STARTS_WITH=8, CONTAINS=9,
+// END_WITH=10, IN=11, NOT_IN=12
+
+// PbCompoundFunction constants (using int32 instead of enum for proto3 
compatibility)
+// AND=0, OR=1
+
+// Represents a data type
+message PbDataType {
+  // See PbDataTypeRoot constants: BOOLEAN=0 ... BYTES=15
+  required int32 root = 1;
+  // For complex types like DECIMAL, additional parameters
+  optional int32 length = 2;
+  required bool nullable = 3;
+  optional int32 precision = 4;
+  optional int32 scale = 5;
+}
+
+// PbDataTypeRoot constants (using int32 instead of enum for proto3 
compatibility)
+// BOOLEAN=0, TINYINT=1, SMALLINT=2, INT=3, BIGINT=4, FLOAT=5, DOUBLE=6,
+// CHAR=7, VARCHAR=8, DECIMAL=9, DATE=10, TIME_WITHOUT_TIME_ZONE=11,
+// TIMESTAMP_WITHOUT_TIME_ZONE=12, TIMESTAMP_WITH_LOCAL_TIME_ZONE=13,
+// BINARY=14, BYTES=15
+
+// Represents a literal value
+message PbLiteralValue {
+  required PbDataType type = 1;
+  optional bool boolean_value = 2;
+  optional int32 int_value = 3;
+  optional int64 bigint_value = 4;
+  optional float float_value = 5;
+  optional double double_value = 6;
+  optional string string_value = 7;
+  optional bytes binary_value = 8;
+  optional int64 decimal_value = 9;    // Serialized decimal (compact mode)
+  optional bytes decimal_bytes = 10;   // Serialized decimal (non-compact mode)
+  // field numbers 11, 12 reserved for future use
+  optional int64 timestamp_millis_value = 13; // Epoch millis
+  optional int32 timestamp_nano_of_millis_value = 14 [default = 0]; // Nano of 
millis
+  required bool is_null = 15 [default = false];

Review Comment:
   nit: we can move the `is_null` to the field after `type`, because they are 
both `required` fields. And all the other fields and future fields are literal 
value fields. 



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -1230,4 +1230,91 @@ message PbTableStatsRespForBucket {
   // Per-column statistics, keyed by column index.
   // Only populated when target_columns is specified in the request.
   // repeated PbColumnStats column_stats = 10;
+}
+
+/**
+* Definition of predicate system related message
+*/
+
+// PbPredicateType constants (using int32 instead of enum for proto3 
compatibility)
+// LEAF = 0, COMPOUND = 1
+
+// Represents a predicate that can be serialized and transmitted across
+// languages
+message PbPredicate {
+  // See PbPredicateType constants: LEAF = 0, COMPOUND = 1
+  required int32 type = 1;
+  optional PbLeafPredicate leaf = 2;
+  optional PbCompoundPredicate compound = 3;
+}
+
+// Represents a leaf predicate that compares a field with literals
+message PbLeafPredicate {
+  // The function to apply (see PbLeafFunction constants: EQUAL=0 ... 
NOT_IN=12)
+  required int32 function = 1;
+  // The field reference
+  required PbFieldRef field_ref = 2;
+  // The literals to compare with
+  repeated PbLiteralValue literals = 3;
+}
+
+// Represents a field reference
+message PbFieldRef {
+  // The name of the field
+  required string field_name = 1;
+  // The data type of the field
+  required PbDataType data_type = 2;
+  // Optional field index in the row
+  required int32 field_index = 3;
+}
+
+// Represents a compound predicate that combines multiple predicates
+message PbCompoundPredicate {
+  // The function to apply (see PbCompoundFunction constants: AND=0, OR=1)
+  required int32 function = 1;
+  // The child predicates
+  repeated PbPredicate children = 2;
+}
+
+// PbLeafFunction constants (using int32 instead of enum for proto3 
compatibility)
+// EQUAL=0, NOT_EQUAL=1, LESS_THAN=2, LESS_OR_EQUAL=3, GREATER_THAN=4,
+// GREATER_OR_EQUAL=5, IS_NULL=6, IS_NOT_NULL=7, STARTS_WITH=8, CONTAINS=9,
+// END_WITH=10, IN=11, NOT_IN=12
+
+// PbCompoundFunction constants (using int32 instead of enum for proto3 
compatibility)
+// AND=0, OR=1
+
+// Represents a data type
+message PbDataType {
+  // See PbDataTypeRoot constants: BOOLEAN=0 ... BYTES=15
+  required int32 root = 1;
+  // For complex types like DECIMAL, additional parameters
+  optional int32 length = 2;
+  required bool nullable = 3;
+  optional int32 precision = 4;
+  optional int32 scale = 5;
+}
+
+// PbDataTypeRoot constants (using int32 instead of enum for proto3 
compatibility)
+// BOOLEAN=0, TINYINT=1, SMALLINT=2, INT=3, BIGINT=4, FLOAT=5, DOUBLE=6,
+// CHAR=7, VARCHAR=8, DECIMAL=9, DATE=10, TIME_WITHOUT_TIME_ZONE=11,
+// TIMESTAMP_WITHOUT_TIME_ZONE=12, TIMESTAMP_WITH_LOCAL_TIME_ZONE=13,
+// BINARY=14, BYTES=15
+
+// Represents a literal value
+message PbLiteralValue {
+  required PbDataType type = 1;
+  optional bool boolean_value = 2;
+  optional int32 int_value = 3;
+  optional int64 bigint_value = 4;
+  optional float float_value = 5;
+  optional double double_value = 6;
+  optional string string_value = 7;
+  optional bytes binary_value = 8;
+  optional int64 decimal_value = 9;    // Serialized decimal (compact mode)
+  optional bytes decimal_bytes = 10;   // Serialized decimal (non-compact mode)
+  // field numbers 11, 12 reserved for future use
+  optional int64 timestamp_millis_value = 13; // Epoch millis
+  optional int32 timestamp_nano_of_millis_value = 14 [default = 0]; // Nano of 
millis

Review Comment:
   Do not use `default`, because proto3 removes the support of "default" 
keyword.



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -1230,4 +1230,91 @@ message PbTableStatsRespForBucket {
   // Per-column statistics, keyed by column index.
   // Only populated when target_columns is specified in the request.
   // repeated PbColumnStats column_stats = 10;
+}
+
+/**
+* Definition of predicate system related message
+*/
+
+// PbPredicateType constants (using int32 instead of enum for proto3 
compatibility)
+// LEAF = 0, COMPOUND = 1
+
+// Represents a predicate that can be serialized and transmitted across
+// languages
+message PbPredicate {
+  // See PbPredicateType constants: LEAF = 0, COMPOUND = 1
+  required int32 type = 1;
+  optional PbLeafPredicate leaf = 2;
+  optional PbCompoundPredicate compound = 3;
+}
+
+// Represents a leaf predicate that compares a field with literals
+message PbLeafPredicate {
+  // The function to apply (see PbLeafFunction constants: EQUAL=0 ... 
NOT_IN=12)
+  required int32 function = 1;
+  // The field reference
+  required PbFieldRef field_ref = 2;
+  // The literals to compare with
+  repeated PbLiteralValue literals = 3;
+}
+
+// Represents a field reference
+message PbFieldRef {
+  // The name of the field
+  required string field_name = 1;
+  // The data type of the field
+  required PbDataType data_type = 2;
+  // Optional field index in the row
+  required int32 field_index = 3;
+}
+
+// Represents a compound predicate that combines multiple predicates
+message PbCompoundPredicate {
+  // The function to apply (see PbCompoundFunction constants: AND=0, OR=1)
+  required int32 function = 1;
+  // The child predicates
+  repeated PbPredicate children = 2;
+}
+
+// PbLeafFunction constants (using int32 instead of enum for proto3 
compatibility)
+// EQUAL=0, NOT_EQUAL=1, LESS_THAN=2, LESS_OR_EQUAL=3, GREATER_THAN=4,
+// GREATER_OR_EQUAL=5, IS_NULL=6, IS_NOT_NULL=7, STARTS_WITH=8, CONTAINS=9,
+// END_WITH=10, IN=11, NOT_IN=12
+
+// PbCompoundFunction constants (using int32 instead of enum for proto3 
compatibility)
+// AND=0, OR=1
+
+// Represents a data type
+message PbDataType {
+  // See PbDataTypeRoot constants: BOOLEAN=0 ... BYTES=15
+  required int32 root = 1;
+  // For complex types like DECIMAL, additional parameters
+  optional int32 length = 2;
+  required bool nullable = 3;
+  optional int32 precision = 4;
+  optional int32 scale = 5;
+}
+
+// PbDataTypeRoot constants (using int32 instead of enum for proto3 
compatibility)
+// BOOLEAN=0, TINYINT=1, SMALLINT=2, INT=3, BIGINT=4, FLOAT=5, DOUBLE=6,
+// CHAR=7, VARCHAR=8, DECIMAL=9, DATE=10, TIME_WITHOUT_TIME_ZONE=11,
+// TIMESTAMP_WITHOUT_TIME_ZONE=12, TIMESTAMP_WITH_LOCAL_TIME_ZONE=13,
+// BINARY=14, BYTES=15
+
+// Represents a literal value
+message PbLiteralValue {
+  required PbDataType type = 1;
+  optional bool boolean_value = 2;
+  optional int32 int_value = 3;
+  optional int64 bigint_value = 4;
+  optional float float_value = 5;
+  optional double double_value = 6;
+  optional string string_value = 7;
+  optional bytes binary_value = 8;
+  optional int64 decimal_value = 9;    // Serialized decimal (compact mode)
+  optional bytes decimal_bytes = 10;   // Serialized decimal (non-compact mode)
+  // field numbers 11, 12 reserved for future use

Review Comment:
   Is supporting fields 11 and 12 technically challenging? My understanding is 
that these represent time-related fields. I recommend enabling support for them 
directly. If that is not feasible, I suggest relocating 
`timestamp_millis_value` and `timestamp_nano_of_millis_value` to positions 11 
and 12 respectively to maintain schema consistency.



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -1230,4 +1230,91 @@ message PbTableStatsRespForBucket {
   // Per-column statistics, keyed by column index.
   // Only populated when target_columns is specified in the request.
   // repeated PbColumnStats column_stats = 10;
+}
+
+/**
+* Definition of predicate system related message
+*/
+
+// PbPredicateType constants (using int32 instead of enum for proto3 
compatibility)
+// LEAF = 0, COMPOUND = 1
+
+// Represents a predicate that can be serialized and transmitted across
+// languages
+message PbPredicate {
+  // See PbPredicateType constants: LEAF = 0, COMPOUND = 1
+  required int32 type = 1;
+  optional PbLeafPredicate leaf = 2;
+  optional PbCompoundPredicate compound = 3;
+}
+
+// Represents a leaf predicate that compares a field with literals
+message PbLeafPredicate {
+  // The function to apply (see PbLeafFunction constants: EQUAL=0 ... 
NOT_IN=12)
+  required int32 function = 1;
+  // The field reference
+  required PbFieldRef field_ref = 2;

Review Comment:
   Could we simplify field references by using the field id (an `int32`) 
directly? This approach would significantly reduce complexity and serialization 
overhead. Since field names, data types, and indices can be efficiently derived 
from the Schema using the schema_id.



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -1230,4 +1230,91 @@ message PbTableStatsRespForBucket {
   // Per-column statistics, keyed by column index.
   // Only populated when target_columns is specified in the request.
   // repeated PbColumnStats column_stats = 10;
+}
+
+/**
+* Definition of predicate system related message
+*/
+
+// PbPredicateType constants (using int32 instead of enum for proto3 
compatibility)
+// LEAF = 0, COMPOUND = 1
+
+// Represents a predicate that can be serialized and transmitted across
+// languages
+message PbPredicate {
+  // See PbPredicateType constants: LEAF = 0, COMPOUND = 1
+  required int32 type = 1;
+  optional PbLeafPredicate leaf = 2;
+  optional PbCompoundPredicate compound = 3;
+}
+
+// Represents a leaf predicate that compares a field with literals
+message PbLeafPredicate {
+  // The function to apply (see PbLeafFunction constants: EQUAL=0 ... 
NOT_IN=12)
+  required int32 function = 1;
+  // The field reference
+  required PbFieldRef field_ref = 2;
+  // The literals to compare with
+  repeated PbLiteralValue literals = 3;
+}
+
+// Represents a field reference
+message PbFieldRef {
+  // The name of the field
+  required string field_name = 1;
+  // The data type of the field
+  required PbDataType data_type = 2;
+  // Optional field index in the row
+  required int32 field_index = 3;
+}
+
+// Represents a compound predicate that combines multiple predicates
+message PbCompoundPredicate {
+  // The function to apply (see PbCompoundFunction constants: AND=0, OR=1)
+  required int32 function = 1;
+  // The child predicates
+  repeated PbPredicate children = 2;
+}
+
+// PbLeafFunction constants (using int32 instead of enum for proto3 
compatibility)
+// EQUAL=0, NOT_EQUAL=1, LESS_THAN=2, LESS_OR_EQUAL=3, GREATER_THAN=4,
+// GREATER_OR_EQUAL=5, IS_NULL=6, IS_NOT_NULL=7, STARTS_WITH=8, CONTAINS=9,
+// END_WITH=10, IN=11, NOT_IN=12
+
+// PbCompoundFunction constants (using int32 instead of enum for proto3 
compatibility)
+// AND=0, OR=1
+
+// Represents a data type
+message PbDataType {
+  // See PbDataTypeRoot constants: BOOLEAN=0 ... BYTES=15
+  required int32 root = 1;
+  // For complex types like DECIMAL, additional parameters
+  optional int32 length = 2;
+  required bool nullable = 3;
+  optional int32 precision = 4;
+  optional int32 scale = 5;
+}
+
+// PbDataTypeRoot constants (using int32 instead of enum for proto3 
compatibility)
+// BOOLEAN=0, TINYINT=1, SMALLINT=2, INT=3, BIGINT=4, FLOAT=5, DOUBLE=6,
+// CHAR=7, VARCHAR=8, DECIMAL=9, DATE=10, TIME_WITHOUT_TIME_ZONE=11,
+// TIMESTAMP_WITHOUT_TIME_ZONE=12, TIMESTAMP_WITH_LOCAL_TIME_ZONE=13,
+// BINARY=14, BYTES=15
+
+// Represents a literal value
+message PbLiteralValue {
+  required PbDataType type = 1;

Review Comment:
   I think the data type of the literal should be the same with the field 
reference type, so there is no need to introduce the `PbDataType` for 
`PbLiteralValue`? 
   
   Besides, introducing a simple int literal_type field would be beneficial to 
efficiently indicate which specific value field contains the literal, 
streamlining the extraction logic without iterating all the fields, and making 
the `PbLiteralValue` self-contained. 



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.util;
+
+import org.apache.fluss.predicate.And;
+import org.apache.fluss.predicate.CompoundPredicate;
+import org.apache.fluss.predicate.Contains;
+import org.apache.fluss.predicate.EndsWith;
+import org.apache.fluss.predicate.Equal;
+import org.apache.fluss.predicate.GreaterOrEqual;
+import org.apache.fluss.predicate.GreaterThan;
+import org.apache.fluss.predicate.In;
+import org.apache.fluss.predicate.IsNotNull;
+import org.apache.fluss.predicate.IsNull;
+import org.apache.fluss.predicate.LeafFunction;
+import org.apache.fluss.predicate.LeafPredicate;
+import org.apache.fluss.predicate.LessOrEqual;
+import org.apache.fluss.predicate.LessThan;
+import org.apache.fluss.predicate.NotEqual;
+import org.apache.fluss.predicate.NotIn;
+import org.apache.fluss.predicate.Or;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateVisitor;
+import org.apache.fluss.predicate.StartsWith;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.rpc.messages.PbCompoundPredicate;
+import org.apache.fluss.rpc.messages.PbDataType;
+import org.apache.fluss.rpc.messages.PbFieldRef;
+import org.apache.fluss.rpc.messages.PbLeafPredicate;
+import org.apache.fluss.rpc.messages.PbLiteralValue;
+import org.apache.fluss.rpc.messages.PbPredicate;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.BytesType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.DateType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimeType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Utils for converting Predicate to PbPredicate and vice versa. */
+public class PredicateMessageUtils {
+
+    // 
-------------------------------------------------------------------------
+    //  Deserialization: PbPredicate -> Predicate
+    // 
-------------------------------------------------------------------------
+
+    public static Predicate toPredicate(PbPredicate pbPredicate) {
+        PredicateType type = PredicateType.fromValue(pbPredicate.getType());
+        switch (type) {
+            case LEAF:
+                return toLeafPredicate(pbPredicate.getLeaf());
+            case COMPOUND:
+                return toCompoundPredicate(pbPredicate.getCompound());
+            default:
+                throw new IllegalArgumentException("Unknown predicate type: " 
+ type);
+        }
+    }
+
+    public static CompoundPredicate toCompoundPredicate(PbCompoundPredicate 
pbCompound) {
+        List<Predicate> children =
+                pbCompound.getChildrensList().stream()
+                        .map(PredicateMessageUtils::toPredicate)
+                        .collect(Collectors.toList());
+        return new CompoundPredicate(
+                
CompoundFunctionCode.fromValue(pbCompound.getFunction()).getFunction(), 
children);
+    }
+
+    private static LeafPredicate toLeafPredicate(PbLeafPredicate pbLeaf) {
+        PbFieldRef fieldRef = pbLeaf.getFieldRef();
+        List<Object> literals =
+                pbLeaf.getLiteralsList().stream()
+                        .map(PredicateMessageUtils::toLiteralValue)
+                        .collect(Collectors.toList());
+
+        return new LeafPredicate(
+                LeafFunctionCode.fromValue(pbLeaf.getFunction()).getFunction(),
+                toDataType(fieldRef.getDataType()),
+                fieldRef.getFieldIndex(),
+                fieldRef.getFieldName(),
+                literals);
+    }
+
+    private static DataType toDataType(PbDataType pbType) {
+        DataTypeRoot root = 
DataTypeRootCode.fromValue(pbType.getRoot()).getDataTypeRoot();
+        boolean nullable = pbType.isNullable();
+        switch (root) {
+            case BOOLEAN:
+                return new BooleanType(nullable);
+            case TINYINT:
+                return new TinyIntType(nullable);
+            case SMALLINT:
+                return new SmallIntType(nullable);
+            case INTEGER:
+                return new IntType(nullable);
+            case BIGINT:
+                return new BigIntType(nullable);
+            case FLOAT:
+                return new FloatType(nullable);
+            case DOUBLE:
+                return new DoubleType(nullable);
+            case CHAR:
+                return new CharType(nullable, pbType.getLength());
+            case STRING:
+                return new StringType(nullable);
+            case DECIMAL:
+                return new DecimalType(nullable, pbType.getPrecision(), 
pbType.getScale());
+            case DATE:
+                return new DateType(nullable);
+            case TIME_WITHOUT_TIME_ZONE:
+                return new TimeType(nullable, pbType.getPrecision());
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return new TimestampType(nullable, pbType.getPrecision());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return new LocalZonedTimestampType(nullable, 
pbType.getPrecision());
+            case BINARY:
+                return new BinaryType(pbType.getLength());
+            case BYTES:
+                return new BytesType(nullable);
+            default:
+                throw new IllegalArgumentException("Unknown data type root: " 
+ root);
+        }
+    }
+
+    private static Object toLiteralValue(PbLiteralValue pbLiteral) {
+        if (pbLiteral.isIsNull()) {
+            return null;
+        }
+        DataTypeRoot root =
+                
DataTypeRootCode.fromValue(pbLiteral.getType().getRoot()).getDataTypeRoot();
+        switch (root) {
+            case BOOLEAN:
+                return pbLiteral.isBooleanValue();
+            case TINYINT:
+                return (byte) pbLiteral.getIntValue();
+            case SMALLINT:
+                return (short) pbLiteral.getIntValue();
+            case INTEGER:
+                return pbLiteral.getIntValue();
+            case BIGINT:
+                return pbLiteral.getBigintValue();
+            case FLOAT:
+                return pbLiteral.getFloatValue();
+            case DOUBLE:
+                return pbLiteral.getDoubleValue();
+            case CHAR:
+            case STRING:
+                String stringValue = pbLiteral.getStringValue();
+                return stringValue == null ? null : 
BinaryString.fromString(stringValue);
+            case DECIMAL:
+                if (pbLiteral.hasDecimalBytes()) {
+                    return Decimal.fromUnscaledBytes(
+                            pbLiteral.getDecimalBytes(),
+                            pbLiteral.getType().getPrecision(),
+                            pbLiteral.getType().getScale());
+                } else {
+                    return Decimal.fromUnscaledLong(
+                            pbLiteral.getDecimalValue(),
+                            pbLiteral.getType().getPrecision(),
+                            pbLiteral.getType().getScale());
+                }
+            case DATE:
+                return LocalDate.ofEpochDay(pbLiteral.getBigintValue());
+            case TIME_WITHOUT_TIME_ZONE:
+                return LocalTime.ofNanoOfDay(pbLiteral.getIntValue() * 
1_000_000L);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return TimestampNtz.fromMillis(
+                        pbLiteral.getTimestampMillisValue(),
+                        pbLiteral.getTimestampNanoOfMillisValue());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return TimestampLtz.fromEpochMillis(
+                        pbLiteral.getTimestampMillisValue(),
+                        pbLiteral.getTimestampNanoOfMillisValue());
+            case BINARY:
+            case BYTES:
+                return pbLiteral.getBinaryValue();
+            default:
+                throw new IllegalArgumentException("Unknown literal value 
type: " + root);
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Serialization: Predicate -> PbPredicate
+    // 
-------------------------------------------------------------------------
+
+    public static PbPredicate toPbPredicate(Predicate predicate) {
+        return predicate.visit(
+                new PredicateVisitor<PbPredicate>() {
+                    @Override
+                    public PbPredicate visit(LeafPredicate predicate) {
+                        PbFieldRef fieldRef = new PbFieldRef();
+                        fieldRef.setDataType(toPbDataType(predicate.type()));
+                        fieldRef.setFieldIndex(predicate.index());
+                        fieldRef.setFieldName(predicate.fieldName());
+
+                        PbLeafPredicate pbLeaf = new PbLeafPredicate();
+                        pbLeaf.setFunction(
+                                
LeafFunctionCode.fromFunction(predicate.function()).getValue());
+                        pbLeaf.setFieldRef(fieldRef);
+
+                        List<PbLiteralValue> literals = new ArrayList<>();
+                        for (Object literal : predicate.literals()) {
+                            literals.add(toPbLiteralValue(predicate.type(), 
literal));
+                        }
+                        pbLeaf.addAllLiterals(literals);
+
+                        PbPredicate pbPredicate = new PbPredicate();
+                        pbPredicate.setType(PredicateType.LEAF.getValue());
+                        pbPredicate.setLeaf(pbLeaf);
+                        return pbPredicate;
+                    }
+
+                    @Override
+                    public PbPredicate visit(CompoundPredicate predicate) {
+                        PbCompoundPredicate pbCompound = new 
PbCompoundPredicate();
+                        pbCompound.setFunction(
+                                
CompoundFunctionCode.fromFunction(predicate.function()).getValue());
+                        pbCompound.addAllChildrens(
+                                predicate.children().stream()
+                                        
.map(PredicateMessageUtils::toPbPredicate)
+                                        .collect(Collectors.toList()));
+
+                        PbPredicate pbPredicate = new PbPredicate();
+                        pbPredicate.setType(PredicateType.COMPOUND.getValue());
+                        pbPredicate.setCompound(pbCompound);
+                        return pbPredicate;
+                    }
+                });
+    }
+
+    private static PbDataType toPbDataType(DataType dataType) {
+        PbDataType pbDataType = new PbDataType();
+        pbDataType.setNullable(dataType.isNullable());
+        
pbDataType.setRoot(DataTypeRootCode.fromDataTypeRoot(dataType.getTypeRoot()).getValue());
+
+        // Set type-specific parameters
+        if (dataType instanceof CharType) {
+            pbDataType.setLength(((CharType) dataType).getLength());
+        } else if (dataType instanceof DecimalType) {
+            pbDataType.setPrecision(((DecimalType) dataType).getPrecision());
+            pbDataType.setScale(((DecimalType) dataType).getScale());
+        } else if (dataType instanceof TimeType) {
+            pbDataType.setPrecision(((TimeType) dataType).getPrecision());
+        } else if (dataType instanceof TimestampType) {
+            pbDataType.setPrecision(((TimestampType) dataType).getPrecision());
+        } else if (dataType instanceof LocalZonedTimestampType) {
+            pbDataType.setPrecision(((LocalZonedTimestampType) 
dataType).getPrecision());
+        } else if (dataType instanceof BinaryType) {
+            pbDataType.setLength(((BinaryType) dataType).getLength());
+        }
+        return pbDataType;
+    }
+
+    private static PbLiteralValue toPbLiteralValue(DataType type, Object 
literal) {
+        PbLiteralValue pbLiteral = new PbLiteralValue();
+        pbLiteral.setType(toPbDataType(type));
+        if (literal == null) {
+            pbLiteral.setIsNull(true);
+            return pbLiteral;
+        }
+        pbLiteral.setIsNull(false);
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case STRING:
+                pbLiteral.setStringValue(literal.toString());
+                break;
+            case BOOLEAN:
+                pbLiteral.setBooleanValue((Boolean) literal);
+                break;
+            case BINARY:
+            case BYTES:
+                pbLiteral.setBinaryValue((byte[]) literal);
+                break;
+            case DECIMAL:
+                Decimal decimal = (Decimal) literal;
+                if (decimal.isCompact()) {
+                    pbLiteral.setDecimalValue(decimal.toUnscaledLong());
+                } else {
+                    pbLiteral.setDecimalBytes(decimal.toUnscaledBytes());
+                }
+                break;
+            case TINYINT:
+                pbLiteral.setIntValue((Byte) literal);
+                break;
+            case SMALLINT:
+                pbLiteral.setIntValue((Short) literal);
+                break;
+            case INTEGER:
+                pbLiteral.setIntValue((Integer) literal);
+                break;
+            case DATE:
+                pbLiteral.setBigintValue(((LocalDate) literal).toEpochDay());
+                break;
+            case TIME_WITHOUT_TIME_ZONE:
+                pbLiteral.setIntValue((int) (((LocalTime) 
literal).toNanoOfDay() / 1_000_000L));
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                pbLiteral.setTimestampMillisValue(((TimestampNtz) 
literal).getMillisecond());
+                pbLiteral.setTimestampNanoOfMillisValue(
+                        ((TimestampNtz) literal).getNanoOfMillisecond());
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                pbLiteral.setTimestampMillisValue(((TimestampLtz) 
literal).getEpochMillisecond());
+                pbLiteral.setTimestampNanoOfMillisValue(
+                        ((TimestampLtz) literal).getNanoOfMillisecond());
+                break;
+            case BIGINT:
+                pbLiteral.setBigintValue((Long) literal);
+                break;
+            case FLOAT:
+                pbLiteral.setFloatValue((Float) literal);
+                break;
+            case DOUBLE:
+                pbLiteral.setDoubleValue((Double) literal);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown data type: " + 
type.getTypeRoot());
+        }
+        return pbLiteral;
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Proto int32 <-> domain object mapping enums
+    // 
-------------------------------------------------------------------------
+
+    /** Maps PbPredicate.type int32 values to predicate kinds. */
+    private enum PredicateType {
+        LEAF(0),
+        COMPOUND(1);
+
+        private final int value;
+        private static final Map<Integer, PredicateType> VALUE_MAP = new 
HashMap<>();

Review Comment:
   nit: I think using `PredicateType[]`  array would be more efficient than 
`HashMap`, because all keys are sequences from 0. This also applies to 
`LeafFunctionCode#VALUE_MAP`, `CompoundFunctionCode#VALUE_MAP`, 
`DataTypeRootCode#VALUE_MAP`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to