This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a9e4ffdbd96eb5eadb775b30a2fe7252549bfa27
Author: Ceng <441651...@qq.com>
AuthorDate: Tue Jul 30 10:21:28 2024 +0800

    [Feature][external catalog/lakesoul] LakeSoul Catalog support Filter 
Pushdown & Cdc data handling & S3 data access  (#37979)
    
    ## Proposed changes
    
    Issue Number: close #37978
    
    ---------
    
    Signed-off-by: zenghua <441651...@qq.com>
---
 fe/be-java-extensions/lakesoul-scanner/pom.xml     |  22 +-
 .../apache/doris/lakesoul/LakeSoulJniScanner.java  |  82 +++-
 .../org/apache/doris/lakesoul/LakeSoulUtils.java   |  30 +-
 .../apache/doris/lakesoul/arrow/ArrowUtils.java    |   8 +-
 .../lakesoul/arrow/LakeSoulArrowJniScanner.java    |  44 +-
 fe/fe-core/pom.xml                                 |  44 +-
 .../lakesoul/LakeSoulExternalCatalog.java          |  22 +-
 .../datasource/lakesoul/LakeSoulExternalTable.java |  22 +-
 .../doris/datasource/lakesoul/LakeSoulUtils.java   | 535 +++++++++++++++++++++
 .../lakesoul/source/LakeSoulScanNode.java          | 158 +++++-
 .../doris/nereids/rules/analysis/BindRelation.java |   1 +
 .../datasource/lakesoul/LakeSoulPredicateTest.java | 280 +++++++++++
 fe/pom.xml                                         |   4 +-
 regression-test/conf/regression-conf.groovy        |  11 +
 .../lakesoul/test_lakesoul_filter.out              |   8 +
 .../pipeline/external/conf/regression-conf.groovy  |   4 +
 .../lakesoul/test_lakesoul_catalog.groovy          |  23 +-
 .../lakesoul/test_lakesoul_filter.groovy           |  58 +++
 .../lakesoul/test_external_table_lakesoul.groovy   |  24 +-
 19 files changed, 1269 insertions(+), 111 deletions(-)

diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml 
b/fe/be-java-extensions/lakesoul-scanner/pom.xml
index cbbb473483e..24d7efc7614 100644
--- a/fe/be-java-extensions/lakesoul-scanner/pom.xml
+++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml
@@ -47,21 +47,6 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.arrow</groupId>
-            <artifactId>arrow-vector</artifactId>
-            <version>${arrow.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.arrow</groupId>
-            <artifactId>arrow-memory-unsafe</artifactId>
-            <version>${arrow.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.arrow</groupId>
-            <artifactId>arrow-c-data</artifactId>
-            <version>${arrow.version}</version>
-        </dependency>
 
         <!-- scala deps -->
         <dependency>
@@ -85,7 +70,8 @@ under the License.
         <dependency>
             <groupId>com.dmetasoul</groupId>
             <artifactId>lakesoul-io-java</artifactId>
-            <version>2.5.4</version>
+            <version>${lakesoul.version}</version>
+            <classifier>shaded</classifier>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
@@ -99,10 +85,6 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-api</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>org.apache.arrow</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
                 <exclusion>
                     <groupId>org.antlr</groupId>
                     <artifactId>antlr4-runtime</artifactId>
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
index 3dfbff756db..a7ac785d1fb 100644
--- 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
@@ -17,25 +17,30 @@
 
 package org.apache.doris.lakesoul;
 
-import org.apache.doris.common.jni.vec.ScanPredicate;
 import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner;
-import org.apache.doris.lakesoul.parquet.ParquetFilter;
 
 import com.dmetasoul.lakesoul.LakeSoulArrowReader;
 import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.types.pojo.Schema;
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference;
+import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import io.substrait.proto.Plan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 public class LakeSoulJniScanner extends LakeSoulArrowJniScanner {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LakeSoulJniScanner.class);
 
     private final Map<String, String> params;
 
@@ -60,6 +65,9 @@ public class LakeSoulJniScanner extends 
LakeSoulArrowJniScanner {
         withAllocator(nativeIOReader.getAllocator());
         nativeIOReader.setBatchSize(batchSize);
 
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("opening LakeSoulJniScanner with params={}", params);
+        }
         // add files
         for (String file : 
params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) {
             nativeIOReader.addFile(file);
@@ -72,19 +80,39 @@ public class LakeSoulJniScanner extends 
LakeSoulArrowJniScanner {
                     
Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList()));
         }
 
-        Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
+        String options = params.getOrDefault(LakeSoulUtils.OPTIONS, "{}");
+        Map<String, String> optionsMap = new ObjectMapper().readValue(
+                options, new TypeReference<Map<String, String>>() {}
+        );
+        String base64Predicate = 
optionsMap.get(LakeSoulUtils.SUBSTRAIT_PREDICATE);
+        if (base64Predicate != null) {
+            Plan predicate = SubstraitUtil.decodeBase64String(base64Predicate);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("push predicate={}", predicate);
+            }
+            nativeIOReader.addFilterProto(predicate);
+        }
+
+        for (String key : LakeSoulUtils.OBJECT_STORE_OPTIONS) {
+            String value = optionsMap.get(key);
+            if (key != null) {
+                nativeIOReader.setObjectStoreOption(key, value);
+            }
+        }
+
+        Schema tableSchema = 
Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
         String[] requiredFieldNames = 
params.get(LakeSoulUtils.REQUIRED_FIELDS).split(LakeSoulUtils.LIST_DELIM);
 
         List<Field> requiredFields = new ArrayList<>();
         for (String fieldName : requiredFieldNames) {
-            requiredFields.add(schema.findField(fieldName));
+            requiredFields.add(tableSchema.findField(fieldName));
         }
 
         requiredSchema = new Schema(requiredFields);
 
         nativeIOReader.setSchema(requiredSchema);
 
-        HashSet<String> partitionColumn = new HashSet<>();
+        List<Field> partitionFields = new ArrayList<>();
         for (String partitionKV : 
params.getOrDefault(LakeSoulUtils.PARTITION_DESC, "")
                 .split(LakeSoulUtils.LIST_DELIM)) {
             if (partitionKV.isEmpty()) {
@@ -94,17 +122,15 @@ public class LakeSoulJniScanner extends 
LakeSoulArrowJniScanner {
             if (kv.length != 2) {
                 throw new IllegalArgumentException("Invalid partition column = 
" + partitionKV);
             }
-            partitionColumn.add(kv[0]);
+            nativeIOReader.setDefaultColumnValue(kv[0], kv[1]);
+            partitionFields.add(tableSchema.findField(kv[0]));
+        }
+        if (!partitionFields.isEmpty()) {
+            nativeIOReader.setPartitionSchema(new Schema(partitionFields));
         }
 
         initTableInfo(params);
 
-        for (ScanPredicate predicate : predicates) {
-            if (!partitionColumn.contains(predicate.columName)) {
-                
nativeIOReader.addFilter(ParquetFilter.toParquetFilter(predicate).toString());
-            }
-        }
-
         nativeIOReader.initializeReader();
         lakesoulArrowReader = new LakeSoulArrowReader(nativeIOReader, 
awaitTimeout);
     }
@@ -156,4 +182,28 @@ public class LakeSoulJniScanner extends 
LakeSoulArrowJniScanner {
             currentBatch.close();
         }
     }
+
+    public static void main(String[] args) throws IOException {
+        HashMap<String, String> params = new HashMap<>();
+        params.put("required_fields", "r_regionkey;r_name;r_comment");
+        params.put("primary_keys", "r_regionkey;r_name");
+        params.put("query_id", "e9d075a6500a4cac-b94630fd4b30c171");
+        params.put("file_paths",
+                
"file:/Users/ceng/Documents/GitHub/LakeSoul/rust/lakesoul-datafusion/"
+                    + "default/region/part-RzmUvDFtYV8ceb3J_0000.parquet"
+        );
+        params.put("options", "{}");
+        params.put("table_schema",
+                "{\"fields\":["
+                    + 
"{\"name\":\"r_regionkey\",\"type\":{\"name\":\"int\",\"isSigned\":true,\"bitWidth\":64},"
+                    + "\"nullable\":false,\"children\":[]},"
+                    + 
"{\"name\":\"r_name\",\"type\":{\"name\":\"utf8\"},\"nullable\":false,\"children\":[]},"
+                    + 
"{\"name\":\"r_comment\",\"type\":{\"name\":\"utf8\"},\"nullable\":false,\"children\":[]}"
+                    + "],"
+                    + "\"metadata\":null}");
+        params.put("partition_descs", "");
+        LakeSoulJniScanner scanner = new LakeSoulJniScanner(1024, params);
+        scanner.open();
+        System.out.println(scanner.getNext());
+    }
 }
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
index 6c7f88f3ab3..ca07a81d0da 100644
--- 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
@@ -17,13 +17,29 @@
 
 package org.apache.doris.lakesoul;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class LakeSoulUtils {
-    public static String FILE_NAMES = "file_paths";
-    public static String PRIMARY_KEYS = "primary_keys";
-    public static String SCHEMA_JSON = "table_schema";
-    public static String PARTITION_DESC = "partition_descs";
-    public static String REQUIRED_FIELDS = "required_fields";
+    public static final String FILE_NAMES = "file_paths";
+    public static final String PRIMARY_KEYS = "primary_keys";
+    public static final String SCHEMA_JSON = "table_schema";
+    public static final String PARTITION_DESC = "partition_descs";
+    public static final String REQUIRED_FIELDS = "required_fields";
+    public static final String OPTIONS = "options";
+    public static final String SUBSTRAIT_PREDICATE = "substrait_predicate";
+    public static final String LIST_DELIM = ";";
+    public static final String PARTITIONS_KV_DELIM = "=";
+
+    public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+    public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+    public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+    public static final String FS_S3A_PATH_STYLE_ACCESS = 
"fs.s3a.path.style.access";
 
-    public static String LIST_DELIM = ";";
-    public static String PARTITIONS_KV_DELIM = "=";
+    public static final List<String> OBJECT_STORE_OPTIONS = Arrays.asList(
+            FS_S3A_ACCESS_KEY,
+            FS_S3A_SECRET_KEY,
+            FS_S3A_ENDPOINT,
+            FS_S3A_PATH_STYLE_ACCESS
+    );
 }
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
index 3ad28ba783a..94ac32935e8 100644
--- 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
@@ -20,10 +20,10 @@ package org.apache.doris.lakesoul.arrow;
 import org.apache.doris.common.jni.utils.OffHeap;
 import org.apache.doris.common.jni.utils.TypeNativeBytes;
 
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.util.Preconditions;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-import org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf;
+import com.lakesoul.shaded.org.apache.arrow.util.Preconditions;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.ArrowType;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
 
 import java.time.LocalDate;
 import java.time.LocalDateTime;
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
index 320d653a20a..3c73c2f1ab4 100644
--- 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
@@ -23,28 +23,28 @@ import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ScanPredicate;
 import org.apache.doris.common.jni.vec.VectorTable;
 
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.TimeStampMicroTZVector;
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliTZVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampNanoTZVector;
-import org.apache.arrow.vector.TimeStampNanoVector;
-import org.apache.arrow.vector.TimeStampSecTZVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.VarCharVector;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.complex.ListVector;
-import org.apache.arrow.vector.complex.StructVector;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.types.pojo.Schema;
+import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf;
+import com.lakesoul.shaded.org.apache.arrow.memory.BufferAllocator;
+import com.lakesoul.shaded.org.apache.arrow.vector.BitVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.DateDayVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.DecimalVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.FieldVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.ValueVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.VarCharVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import com.lakesoul.shaded.org.apache.arrow.vector.complex.ListVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.complex.StructVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index bac2346185d..8021f2a3b18 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -568,7 +568,7 @@ under the License.
         <dependency>
             <groupId>com.dmetasoul</groupId>
             <artifactId>lakesoul-common</artifactId>
-            <version>2.5.4</version>
+            <version>${lakesoul.version}</version>
             <classifier>shaded</classifier>
             <exclusions>
                 <exclusion>
@@ -577,6 +577,46 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>com.dmetasoul</groupId>
+            <artifactId>lakesoul-io-java</artifactId>
+            <version>${lakesoul.version}</version>
+            <classifier>shaded</classifier>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.antlr</groupId>
+                    <artifactId>antlr4-runtime</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>jsr305</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.iceberg</groupId>
@@ -1220,4 +1260,4 @@ under the License.
             </extension>
         </extensions>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
index dd8342ad660..e813ac2fc97 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.lakesoul;
 
+
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
@@ -25,6 +26,7 @@ import org.apache.doris.datasource.property.PropertyConverter;
 
 import com.dmetasoul.lakesoul.meta.DBManager;
 import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
 import com.dmetasoul.lakesoul.meta.entity.TableInfo;
 import com.google.common.collect.Lists;
 
@@ -33,7 +35,7 @@ import java.util.Map;
 
 public class LakeSoulExternalCatalog extends ExternalCatalog {
 
-    private DBManager dbManager;
+    private DBManager lakesoulMetadataManager;
 
     private final Map<String, String> props;
 
@@ -48,13 +50,13 @@ public class LakeSoulExternalCatalog extends 
ExternalCatalog {
     @Override
     protected List<String> listDatabaseNames() {
         initLocalObjectsImpl();
-        return dbManager.listNamespaces();
+        return lakesoulMetadataManager.listNamespaces();
     }
 
     @Override
     public List<String> listTableNames(SessionContext ctx, String dbName) {
         makeSureInitialized();
-        List<TableInfo> tifs = dbManager.getTableInfosByNamespace(dbName);
+        List<TableInfo> tifs = 
lakesoulMetadataManager.getTableInfosByNamespace(dbName);
         List<String> tableNames = Lists.newArrayList();
         for (TableInfo item : tifs) {
             tableNames.add(item.getTableName());
@@ -65,14 +67,13 @@ public class LakeSoulExternalCatalog extends 
ExternalCatalog {
     @Override
     public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
         makeSureInitialized();
-        TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(dbName, 
tblName);
-
+        TableInfo tableInfo = 
lakesoulMetadataManager.getTableInfoByNameAndNamespace(dbName, tblName);
         return null != tableInfo;
     }
 
     @Override
     protected void initLocalObjectsImpl() {
-        if (dbManager == null) {
+        if (lakesoulMetadataManager == null) {
             if (props != null) {
                 if (props.containsKey(DBUtil.urlKey)) {
                     System.setProperty(DBUtil.urlKey, 
props.get(DBUtil.urlKey));
@@ -84,13 +85,18 @@ public class LakeSoulExternalCatalog extends 
ExternalCatalog {
                     System.setProperty(DBUtil.passwordKey, 
props.get(DBUtil.passwordKey));
                 }
             }
-            dbManager = new DBManager();
+            lakesoulMetadataManager = new DBManager();
         }
     }
 
     public TableInfo getLakeSoulTable(String dbName, String tblName) {
         makeSureInitialized();
-        return dbManager.getTableInfoByNameAndNamespace(tblName, dbName);
+        return lakesoulMetadataManager.getTableInfoByNameAndNamespace(tblName, 
dbName);
+    }
+
+    public List<PartitionInfo> listPartitionInfo(String tableId) {
+        makeSureInitialized();
+        return lakesoulMetadataManager.getAllPartitionInfo(tableId);
     }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
index 46e8d1db47c..a5cf3478ae8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
@@ -25,17 +25,23 @@ import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.ExternalAnalysisTask;
 import org.apache.doris.thrift.TLakeSoulTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
 import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
 import com.dmetasoul.lakesoul.meta.entity.TableInfo;
 import com.google.common.collect.Lists;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -45,11 +51,20 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class LakeSoulExternalTable extends ExternalTable {
-
+    private static final Logger LOG = 
LogManager.getLogger(LakeSoulExternalTable.class);
     public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6;
 
+    public final String tableId;
+
     public LakeSoulExternalTable(long id, String name, String dbName, 
LakeSoulExternalCatalog catalog) {
         super(id, name, catalog, dbName, TableType.LAKESOUl_EXTERNAL_TABLE);
+        tableId = getLakeSoulTableInfo().getTableId();
+    }
+
+    @Override
+    public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
+        makeSureInitialized();
+        return new ExternalAnalysisTask(info);
     }
 
     private Type arrowFiledToDorisType(Field field) {
@@ -150,6 +165,7 @@ public class LakeSoulExternalTable extends ExternalTable {
         String tableSchema = tableInfo.getTableSchema();
         DBUtil.TablePartitionKeys partitionKeys = 
DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
         Schema schema;
+        LOG.info("tableSchema={}", tableSchema);
         try {
             schema = Schema.fromJSON(tableSchema);
         } catch (IOException e) {
@@ -174,6 +190,10 @@ public class LakeSoulExternalTable extends ExternalTable {
         return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, 
name);
     }
 
+    public List<PartitionInfo> listPartitionInfo() {
+        return ((LakeSoulExternalCatalog) catalog).listPartitionInfo(tableId);
+    }
+
     public String tablePath() {
         return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, 
name).getTablePath();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
new file mode 100644
index 00000000000..8f7cf83dbfc
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
@@ -0,0 +1,535 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DecimalLiteral;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.Subquery;
+import org.apache.doris.planner.ColumnBound;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.thrift.TExprOpcode;
+
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import io.substrait.expression.Expression;
+import io.substrait.extension.DefaultExtensionCatalog;
+import io.substrait.type.Type;
+import io.substrait.type.TypeCreator;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class LakeSoulUtils {
+
+    public static final String FILE_NAMES = "file_paths";
+    public static final String PRIMARY_KEYS = "primary_keys";
+    public static final String SCHEMA_JSON = "table_schema";
+    public static final String PARTITION_DESC = "partition_descs";
+    public static final String REQUIRED_FIELDS = "required_fields";
+    public static final String OPTIONS = "options";
+    public static final String SUBSTRAIT_PREDICATE = "substrait_predicate";
+    public static final String CDC_COLUMN = "lakesoul_cdc_change_column";
+    public static final String LIST_DELIM = ";";
+    public static final String PARTITIONS_KV_DELIM = "=";
+    public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+    public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+    public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+    public static final String FS_S3A_REGION = "fs.s3a.endpoint.region";
+    public static final String FS_S3A_PATH_STYLE_ACCESS = 
"fs.s3a.path.style.access";
+
+    private static final OffsetDateTime EPOCH;
+    private static final LocalDate EPOCH_DAY;
+
+    static {
+        EPOCH = Instant.ofEpochSecond(0L).atOffset(ZoneOffset.UTC);
+        EPOCH_DAY = EPOCH.toLocalDate();
+    }
+
+    public static List<PartitionInfo> applyPartitionFilters(
+            List<PartitionInfo> allPartitionInfo,
+            String tableName,
+            Schema partitionArrowSchema,
+            Map<String, ColumnRange> columnNameToRange
+    ) throws IOException {
+
+        Expression conjunctionFilter = null;
+        for (Field field : partitionArrowSchema.getFields()) {
+            ColumnRange columnRange = columnNameToRange.get(field.getName());
+            if (columnRange != null) {
+                Expression expr = columnRangeToSubstraitFilter(field, 
columnRange);
+                if (expr != null) {
+                    if (conjunctionFilter == null) {
+                        conjunctionFilter = expr;
+                    } else {
+                        conjunctionFilter = 
SubstraitUtil.and(conjunctionFilter, expr);
+                    }
+                }
+            }
+        }
+        return SubstraitUtil.applyPartitionFilters(
+            allPartitionInfo,
+            partitionArrowSchema,
+            SubstraitUtil.substraitExprToProto(conjunctionFilter, tableName)
+        );
+    }
+
+    public static Expression columnRangeToSubstraitFilter(
+            Field columnField,
+            ColumnRange columnRange
+    ) throws IOException {
+        Optional<RangeSet<ColumnBound>> rangeSetOpt = 
columnRange.getRangeSet();
+        if (columnRange.hasConjunctiveIsNull() || !rangeSetOpt.isPresent()) {
+            return SubstraitUtil.CONST_TRUE;
+        } else {
+            RangeSet<ColumnBound> rangeSet = rangeSetOpt.get();
+            if (rangeSet.isEmpty()) {
+                return SubstraitUtil.CONST_TRUE;
+            } else {
+                Expression conjunctionFilter = null;
+                for (Range range : rangeSet.asRanges()) {
+                    Expression expr = rangeToSubstraitFilter(columnField, 
range);
+                    if (expr != null) {
+                        if (conjunctionFilter == null) {
+                            conjunctionFilter = expr;
+                        } else {
+                            conjunctionFilter = 
SubstraitUtil.or(conjunctionFilter, expr);
+                        }
+                    }
+                }
+                return conjunctionFilter;
+            }
+        }
+    }
+
+    public static Expression rangeToSubstraitFilter(Field columnField, Range 
range) throws IOException {
+        if (!range.hasLowerBound() && !range.hasUpperBound()) {
+            // Range.all()
+            return SubstraitUtil.CONST_TRUE;
+        } else {
+            Expression upper = SubstraitUtil.CONST_TRUE;
+            if (range.hasUpperBound()) {
+                String func = range.upperBoundType() == BoundType.OPEN ? 
"lt:any_any" : "lte:any_any";
+                Expression left = 
SubstraitUtil.arrowFieldToSubstraitField(columnField);
+                Expression right = SubstraitUtil.anyToSubstraitLiteral(
+                        SubstraitUtil.arrowFieldToSubstraitType(columnField),
+                        ((ColumnBound) 
range.upperEndpoint()).getValue().getRealValue());
+                upper = SubstraitUtil.makeBinary(
+                        left,
+                        right,
+                        DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+                        func,
+                        TypeCreator.NULLABLE.BOOLEAN
+                );
+            }
+            Expression lower = SubstraitUtil.CONST_TRUE;
+            if (range.hasLowerBound()) {
+                String func = range.lowerBoundType() == BoundType.OPEN ? 
"gt:any_any" : "gte:any_any";
+                Expression left = 
SubstraitUtil.arrowFieldToSubstraitField(columnField);
+                Expression right = SubstraitUtil.anyToSubstraitLiteral(
+                        SubstraitUtil.arrowFieldToSubstraitType(columnField),
+                        ((ColumnBound) 
range.lowerEndpoint()).getValue().getRealValue());
+                lower = SubstraitUtil.makeBinary(
+                        left,
+                        right,
+                        DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+                        func,
+                        TypeCreator.NULLABLE.BOOLEAN
+                );
+            }
+            return SubstraitUtil.and(upper, lower);
+        }
+    }
+
+    public static io.substrait.proto.Plan getPushPredicate(
+            List<Expr> conjuncts,
+            String tableName,
+            Schema tableSchema,
+            Schema partitionArrowSchema,
+            Map<String, String> properties,
+            boolean incRead
+    ) throws IOException {
+
+        Set<String> partitionColumn =
+                partitionArrowSchema
+                    .getFields()
+                    .stream()
+                    .map(Field::getName)
+                    .collect(Collectors.toSet());
+        Expression conjunctionFilter = null;
+        String cdcColumn = properties.get(CDC_COLUMN);
+        if (cdcColumn != null && !incRead) {
+            conjunctionFilter = 
SubstraitUtil.cdcColumnMergeOnReadFilter(tableSchema.findField(cdcColumn));
+        }
+        for (Expr expr : conjuncts) {
+            if (!isAllPartitionPredicate(expr, partitionColumn)) {
+                Expression predicate = convertToSubstraitExpr(expr, 
tableSchema);
+                if (predicate != null) {
+                    if (conjunctionFilter == null) {
+                        conjunctionFilter = predicate;
+                    } else {
+                        conjunctionFilter = 
SubstraitUtil.and(conjunctionFilter, predicate);
+                    }
+                }
+            }
+        }
+        if (conjunctionFilter == null) {
+            return null;
+        }
+        return SubstraitUtil.substraitExprToProto(conjunctionFilter, 
tableName);
+    }
+
+    public static boolean isAllPartitionPredicate(Expr predicate, Set<String> 
partitionColumns) {
+        if (predicate == null) {
+            return false;
+        }
+        if (predicate instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+            return isAllPartitionPredicate(compoundPredicate.getChild(0), 
partitionColumns)
+                && isAllPartitionPredicate(compoundPredicate.getChild(1), 
partitionColumns);
+        }
+        // Make sure the col slot is always first
+        SlotRef slotRef = convertDorisExprToSlotRef(predicate.getChild(0));
+        LiteralExpr literalExpr = 
convertDorisExprToLiteralExpr(predicate.getChild(1));
+        if (slotRef == null || literalExpr == null) {
+            return false;
+        }
+        String colName = slotRef.getColumnName();
+        return partitionColumns.contains(colName);
+
+    }
+
+    public static SlotRef convertDorisExprToSlotRef(Expr expr) {
+        SlotRef slotRef = null;
+        if (expr instanceof SlotRef) {
+            slotRef = (SlotRef) expr;
+        } else if (expr instanceof CastExpr) {
+            if (expr.getChild(0) instanceof SlotRef) {
+                slotRef = (SlotRef) expr.getChild(0);
+            }
+        }
+        return slotRef;
+    }
+
+    public static LiteralExpr convertDorisExprToLiteralExpr(Expr expr) {
+        LiteralExpr literalExpr = null;
+        if (expr instanceof LiteralExpr) {
+            literalExpr = (LiteralExpr) expr;
+        } else if (expr instanceof CastExpr) {
+            if (expr.getChild(0) instanceof LiteralExpr) {
+                literalExpr = (LiteralExpr) expr.getChild(0);
+            }
+        }
+        return literalExpr;
+    }
+
+    public static Expression convertToSubstraitExpr(Expr predicate, Schema 
tableSchema) throws IOException {
+        if (predicate == null) {
+            return null;
+        }
+        if (predicate instanceof BoolLiteral) {
+            BoolLiteral boolLiteral = (BoolLiteral) predicate;
+            boolean value = boolLiteral.getValue();
+            if (value) {
+                return SubstraitUtil.CONST_TRUE;
+            } else {
+                return SubstraitUtil.CONST_FALSE;
+            }
+        }
+        if (predicate instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+            switch (compoundPredicate.getOp()) {
+                case AND: {
+                    Expression left = 
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+                    Expression right = 
convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema);
+                    if (left != null && right != null) {
+                        return SubstraitUtil.and(left, right);
+                    } else if (left != null) {
+                        return left;
+                    } else {
+                        return right;
+                    }
+                }
+                case OR: {
+                    Expression left = 
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+                    Expression right = 
convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema);
+                    if (left != null && right != null) {
+                        return SubstraitUtil.or(left, right);
+                    }
+                    return null;
+                }
+                case NOT: {
+                    Expression child = 
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+                    if (child != null) {
+                        return SubstraitUtil.not(child);
+                    }
+                    return null;
+                }
+                default:
+                    return null;
+            }
+        } else if (predicate instanceof InPredicate) {
+            InPredicate inExpr = (InPredicate) predicate;
+            if (inExpr.contains(Subquery.class)) {
+                return null;
+            }
+            SlotRef slotRef = convertDorisExprToSlotRef(inExpr.getChild(0));
+            if (slotRef == null) {
+                return null;
+            }
+            String colName = slotRef.getColumnName();
+            Field field = tableSchema.findField(colName);
+            Expression fieldRef = 
SubstraitUtil.arrowFieldToSubstraitField(field);
+
+            colName = field.getName();
+            Type type = field.getType().accept(
+                new 
SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable())
+            );
+            List<Expression.Literal> valueList = new ArrayList<>();
+            for (int i = 1; i < inExpr.getChildren().size(); ++i) {
+                if (!(inExpr.getChild(i) instanceof LiteralExpr)) {
+                    return null;
+                }
+                LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i);
+                Object value = extractDorisLiteral(type, literalExpr);
+                if (value == null) {
+                    return null;
+                }
+                valueList.add(SubstraitUtil.anyToSubstraitLiteral(type, 
value));
+            }
+            if (inExpr.isNotIn()) {
+                // not in
+                return SubstraitUtil.notIn(fieldRef, valueList);
+            } else {
+                // in
+                return SubstraitUtil.in(fieldRef, valueList);
+            }
+        }
+        return convertBinaryExpr(predicate, tableSchema);
+    }
+
+    private static Expression convertBinaryExpr(Expr dorisExpr, Schema 
tableSchema) throws IOException {
+        TExprOpcode opcode = dorisExpr.getOpcode();
+        // Make sure the col slot is always first
+        SlotRef slotRef = convertDorisExprToSlotRef(dorisExpr.getChild(0));
+        LiteralExpr literalExpr = 
convertDorisExprToLiteralExpr(dorisExpr.getChild(1));
+        if (slotRef == null || literalExpr == null) {
+            return null;
+        }
+        String colName = slotRef.getColumnName();
+        Field field = tableSchema.findField(colName);
+        Expression fieldRef = SubstraitUtil.arrowFieldToSubstraitField(field);
+
+        Type type = field.getType().accept(
+            new 
SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable())
+        );
+        Object value = extractDorisLiteral(type, literalExpr);
+        if (value == null) {
+            if (opcode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof 
NullLiteral) {
+                return SubstraitUtil.makeUnary(
+                        fieldRef,
+                        DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+                        "is_null:any",
+                        TypeCreator.NULLABLE.BOOLEAN);
+            } else {
+                return null;
+            }
+        }
+        Expression literal = SubstraitUtil.anyToSubstraitLiteral(
+                type,
+                value
+        );
+
+        String namespace;
+        String func;
+        switch (opcode) {
+            case EQ:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "equal:any_any";
+                break;
+            case EQ_FOR_NULL:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "is_null:any";
+                break;
+            case NE:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "not_equal:any_any";
+                break;
+            case GE:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "gte:any_any";
+                break;
+            case GT:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "gt:any_any";
+                break;
+            case LE:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "lte:any_any";
+                break;
+            case LT:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "lt:any_any";
+                break;
+            case INVALID_OPCODE:
+                if (dorisExpr instanceof FunctionCallExpr) {
+                    String name = dorisExpr.getExprName().toLowerCase();
+                    String s = literalExpr.getStringValue();
+                    if (name.equals("like") && !s.startsWith("%") && 
s.endsWith("%")) {
+                        namespace = DefaultExtensionCatalog.FUNCTIONS_STRING;
+                        func = "like:bool";
+                        break;
+                    }
+                } else if (dorisExpr instanceof IsNullPredicate) {
+                    if (((IsNullPredicate) dorisExpr).isNotNull()) {
+                        namespace = 
DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                        func = "is_not_null:any";
+
+                    } else {
+                        namespace = 
DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                        func = "is_null:any";
+                    }
+                    break;
+                }
+                return null;
+            default:
+                return null;
+        }
+        return SubstraitUtil.makeBinary(fieldRef, literal, namespace, func, 
TypeCreator.NULLABLE.BOOLEAN);
+    }
+
+    public static Object extractDorisLiteral(Type type, LiteralExpr expr) {
+
+        if (expr instanceof BoolLiteral) {
+            if (type instanceof Type.Bool) {
+                return ((BoolLiteral) expr).getValue();
+            }
+            if (type instanceof Type.Str) {
+                return expr.getStringValue();
+            }
+        } else if (expr instanceof DateLiteral) {
+            DateLiteral dateLiteral = (DateLiteral) expr;
+            if (type instanceof Type.Date) {
+                if (dateLiteral.getType().isDatetimeV2() || 
dateLiteral.getType().isDatetime()) {
+                    return null;
+                }
+                return dateLiteral.getLongValue();
+            }
+            if (type instanceof Type.TimestampTZ || type instanceof 
Type.Timestamp) {
+                return dateLiteral.getLongValue();
+            }
+            if (type instanceof Type.Str) {
+                return expr.getStringValue();
+            }
+        } else if (expr instanceof DecimalLiteral) {
+            DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
+            if (type instanceof Type.Decimal) {
+                return decimalLiteral.getValue();
+            } else if (type instanceof Type.FP64) {
+                return decimalLiteral.getDoubleValue();
+            }
+            if (type instanceof Type.Str) {
+                return expr.getStringValue();
+            }
+        } else if (expr instanceof FloatLiteral) {
+            FloatLiteral floatLiteral = (FloatLiteral) expr;
+
+            if (floatLiteral.getType() == org.apache.doris.catalog.Type.FLOAT) 
{
+                return type instanceof Type.FP32
+                    || type instanceof Type.FP64
+                    || type instanceof Type.Decimal ? ((FloatLiteral) 
expr).getValue() : null;
+            } else {
+                return type instanceof Type.FP64
+                    || type instanceof Type.Decimal ? ((FloatLiteral) 
expr).getValue() : null;
+            }
+        } else if (expr instanceof IntLiteral) {
+            if (type instanceof Type.I8
+                    || type instanceof Type.I16
+                    || type instanceof Type.I32
+                    || type instanceof Type.I64
+                    || type instanceof Type.FP32
+                    || type instanceof Type.FP64
+                    || type instanceof Type.Decimal
+                    || type instanceof Type.Date
+            ) {
+                return expr.getRealValue();
+            }
+            if (!expr.getType().isInteger32Type()) {
+                if (type instanceof Type.Time || type instanceof 
Type.Timestamp || type instanceof Type.TimestampTZ) {
+                    return expr.getLongValue();
+                }
+            }
+
+        } else if (expr instanceof StringLiteral) {
+            String value = expr.getStringValue();
+            if (type instanceof Type.Str) {
+                return value;
+            }
+            if (type instanceof Type.Date) {
+                try {
+                    return (int) ChronoUnit.DAYS.between(
+                        EPOCH_DAY,
+                        LocalDate.parse(value, 
DateTimeFormatter.ISO_LOCAL_DATE));
+                } catch (DateTimeParseException e) {
+                    return null;
+                }
+            }
+            if (type instanceof Type.Timestamp || type instanceof 
Type.TimestampTZ) {
+                try {
+                    return ChronoUnit.MICROS.between(
+                        EPOCH,
+                        OffsetDateTime.parse(value, 
DateTimeFormatter.ISO_DATE_TIME));
+                } catch (DateTimeParseException e) {
+                    return null;
+                }
+            }
+        }
+        return null;
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
index 1779aeaca10..fd36bfd52bd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
@@ -17,14 +17,19 @@
 
 package org.apache.doris.datasource.lakesoul.source;
 
+
+
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.TableFormatType;
 import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
+import org.apache.doris.datasource.lakesoul.LakeSoulUtils;
+import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -34,15 +39,26 @@ import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TLakeSoulFileDesc;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
 import com.dmetasoul.lakesoul.meta.DBUtil;
 import com.dmetasoul.lakesoul.meta.DataFileInfo;
 import com.dmetasoul.lakesoul.meta.DataOperation;
 import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
 import com.dmetasoul.lakesoul.meta.entity.TableInfo;
 import com.google.common.collect.Lists;
 import com.lakesoul.shaded.com.alibaba.fastjson.JSON;
 import com.lakesoul.shaded.com.alibaba.fastjson.JSONObject;
+import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference;
+import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import io.substrait.proto.Plan;
+import lombok.SneakyThrows;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -52,19 +68,54 @@ import java.util.stream.Collectors;
 
 public class LakeSoulScanNode extends FileQueryScanNode {
 
-    protected final LakeSoulExternalTable lakeSoulExternalTable;
+    private static final Logger LOG = 
LogManager.getLogger(LakeSoulScanNode.class);
+    protected LakeSoulExternalTable lakeSoulExternalTable;
+
+    String tableName;
+
+    String location;
 
-    protected final TableInfo table;
+    String partitions;
+
+    Schema tableArrowSchema;
+
+    Schema partitionArrowSchema;
+    private Map<String, String> tableProperties;
+
+    String readType;
 
     public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
         super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, 
needCheckColumnPriv);
+    }
+
+    @Override
+    protected void doInitialize() throws UserException {
+        super.doInitialize();
         lakeSoulExternalTable = (LakeSoulExternalTable) desc.getTable();
-        table = lakeSoulExternalTable.getLakeSoulTableInfo();
+        TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo();
+        location = tableInfo.getTablePath();
+        tableName = tableInfo.getTableName();
+        partitions = tableInfo.getPartitions();
+        readType = LakeSoulOptions.ReadType$.MODULE$.FULL_READ();
+        try {
+            tableProperties = new ObjectMapper().readValue(
+                tableInfo.getProperties(),
+                new TypeReference<Map<String, String>>() {}
+            );
+            tableArrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
+            List<Field> partitionFields =
+                    DBUtil.parseTableInfoPartitions(partitions)
+                        .rangeKeys
+                        .stream()
+                        
.map(tableArrowSchema::findField).collect(Collectors.toList());
+            partitionArrowSchema = new Schema(partitionFields);
+        } catch (IOException e) {
+            throw new UserException(e);
+        }
     }
 
     @Override
     protected TFileType getLocationType() throws UserException {
-        String location = table.getTablePath();
         return getLocationType(location);
     }
 
@@ -81,12 +132,12 @@ public class LakeSoulScanNode extends FileQueryScanNode {
 
     @Override
     protected List<String> getPathPartitionKeys() throws UserException {
-        return new 
ArrayList<>(DBUtil.parseTableInfoPartitions(table.getPartitions()).rangeKeys);
+        return new 
ArrayList<>(DBUtil.parseTableInfoPartitions(partitions).rangeKeys);
     }
 
     @Override
     protected TableIf getTargetTable() throws UserException {
-        return lakeSoulExternalTable;
+        return desc.getTable();
     }
 
     @Override
@@ -94,13 +145,21 @@ public class LakeSoulScanNode extends FileQueryScanNode {
         return lakeSoulExternalTable.getHadoopProperties();
     }
 
+    @SneakyThrows
     @Override
     protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}", rangeDesc);
+        }
         if (split instanceof LakeSoulSplit) {
             setLakeSoulParams(rangeDesc, (LakeSoulSplit) split);
         }
     }
 
+    public ExternalCatalog getCatalog() {
+        return lakeSoulExternalTable.getCatalog();
+    }
+
     public static boolean isExistHashPartition(TableInfo tif) {
         JSONObject tableProperties = JSON.parseObject(tif.getProperties());
         if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM())
@@ -111,13 +170,47 @@ public class LakeSoulScanNode extends FileQueryScanNode {
         }
     }
 
-    public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit 
lakeSoulSplit) {
+    public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit 
lakeSoulSplit) throws IOException {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(lakeSoulSplit.getTableFormatType().value());
         TLakeSoulFileDesc fileDesc = new TLakeSoulFileDesc();
         fileDesc.setFilePaths(lakeSoulSplit.getPaths());
         fileDesc.setPrimaryKeys(lakeSoulSplit.getPrimaryKeys());
         fileDesc.setTableSchema(lakeSoulSplit.getTableSchema());
+
+
+        JSONObject options = new JSONObject();
+        Plan predicate = LakeSoulUtils.getPushPredicate(
+                conjuncts,
+                tableName,
+                tableArrowSchema,
+                partitionArrowSchema,
+                tableProperties,
+                
readType.equals(LakeSoulOptions.ReadType$.MODULE$.INCREMENTAL_READ()));
+        if (predicate != null) {
+            options.put(LakeSoulUtils.SUBSTRAIT_PREDICATE, 
SubstraitUtil.encodeBase64String(predicate));
+        }
+        Map<String, String> catalogProps = getCatalog().getProperties();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}", catalogProps);
+        }
+
+        if (catalogProps.get(S3Properties.Env.ENDPOINT) != null) {
+            options.put(LakeSoulUtils.FS_S3A_ENDPOINT, 
catalogProps.get(S3Properties.Env.ENDPOINT));
+            options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "true");
+            if (catalogProps.get(S3Properties.Env.ACCESS_KEY) != null) {
+                options.put(LakeSoulUtils.FS_S3A_ACCESS_KEY, 
catalogProps.get(S3Properties.Env.ACCESS_KEY));
+            }
+            if (catalogProps.get(S3Properties.Env.SECRET_KEY) != null) {
+                options.put(LakeSoulUtils.FS_S3A_SECRET_KEY, 
catalogProps.get(S3Properties.Env.SECRET_KEY));
+            }
+            if (catalogProps.get(S3Properties.Env.REGION) != null) {
+                options.put(LakeSoulUtils.FS_S3A_REGION, 
catalogProps.get(S3Properties.Env.REGION));
+            }
+        }
+
+        fileDesc.setOptions(JSON.toJSONString(options));
+
         fileDesc.setPartitionDescs(lakeSoulSplit.getPartitionDesc()
                 .entrySet().stream().map(entry ->
                         String.format("%s=%s", entry.getKey(), 
entry.getValue())).collect(Collectors.toList()));
@@ -126,24 +219,51 @@ public class LakeSoulScanNode extends FileQueryScanNode {
     }
 
     public List<Split> getSplits() throws UserException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getSplits with columnFilters={}", columnFilters);
+            LOG.debug("getSplits with columnNameToRange={}", 
columnNameToRange);
+            LOG.debug("getSplits with conjuncts={}", conjuncts);
+        }
+
+        List<PartitionInfo> allPartitionInfo = 
lakeSoulExternalTable.listPartitionInfo();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("allPartitionInfo={}", allPartitionInfo);
+        }
+        List<PartitionInfo> filteredPartitionInfo = allPartitionInfo;
+        try {
+            filteredPartitionInfo =
+                    LakeSoulUtils.applyPartitionFilters(
+                        allPartitionInfo,
+                        tableName,
+                        partitionArrowSchema,
+                        columnNameToRange
+                    );
+        } catch (IOException e) {
+            throw new UserException(e);
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("filteredPartitionInfo={}", filteredPartitionInfo);
+        }
+        DataFileInfo[] dataFileInfos = 
DataOperation.getTableDataInfo(filteredPartitionInfo);
+
         List<Split> splits = new ArrayList<>();
         Map<String, Map<Integer, List<String>>> splitByRangeAndHashPartition = 
new LinkedHashMap<>();
-        TableInfo tif = table;
-        DataFileInfo[] dfinfos = 
DataOperation.getTableDataInfo(table.getTableId());
-        for (DataFileInfo pif : dfinfos) {
-            if (isExistHashPartition(tif) && pif.file_bucket_id() != -1) {
-                
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new 
LinkedHashMap<>())
-                        .computeIfAbsent(pif.file_bucket_id(), v -> new 
ArrayList<>())
-                        .add(pif.path());
+        TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo();
+
+        for (DataFileInfo fileInfo : dataFileInfos) {
+            if (isExistHashPartition(tableInfo) && fileInfo.file_bucket_id() 
!= -1) {
+                
splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k -> 
new LinkedHashMap<>())
+                        .computeIfAbsent(fileInfo.file_bucket_id(), v -> new 
ArrayList<>())
+                        .add(fileInfo.path());
             } else {
-                
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new 
LinkedHashMap<>())
+                
splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k -> 
new LinkedHashMap<>())
                         .computeIfAbsent(-1, v -> new ArrayList<>())
-                        .add(pif.path());
+                        .add(fileInfo.path());
             }
         }
         List<String> pkKeys = null;
-        if (!table.getPartitions().equals(";")) {
-            pkKeys = 
Lists.newArrayList(table.getPartitions().split(";")[1].split(","));
+        if (!tableInfo.getPartitions().equals(";")) {
+            pkKeys = 
Lists.newArrayList(tableInfo.getPartitions().split(";")[1].split(","));
         }
 
         for (Map.Entry<String, Map<Integer, List<String>>> entry : 
splitByRangeAndHashPartition.entrySet()) {
@@ -161,7 +281,7 @@ public class LakeSoulScanNode extends FileQueryScanNode {
                         split.getValue(),
                         pkKeys,
                         rangeDesc,
-                        table.getTableSchema(),
+                        tableInfo.getTableSchema(),
                         0, 0, 0,
                         new String[0], null);
                 lakeSoulSplit.setTableFormatType(TableFormatType.LAKESOUL);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 82945fb6963..64178846abf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -283,6 +283,7 @@ public class BindRelation extends OneAnalysisRuleFactory {
                 case PAIMON_EXTERNAL_TABLE:
                 case MAX_COMPUTE_EXTERNAL_TABLE:
                 case TRINO_CONNECTOR_EXTERNAL_TABLE:
+                case LAKESOUl_EXTERNAL_TABLE:
                     return new 
LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table,
                             qualifierWithoutTableName, 
unboundRelation.getTableSample(),
                             unboundRelation.getTableSnapshot());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
new file mode 100644
index 00000000000..aebd74f5e02
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.CompoundPredicate.Operator;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DecimalLiteral;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.DateUnit;
+import 
com.lakesoul.shaded.org.apache.arrow.vector.types.FloatingPointPrecision;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.TimeUnit;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.ArrowType;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.FieldType;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import io.substrait.expression.Expression;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LakeSoulPredicateTest {
+
+    public static Schema schema;
+
+    @BeforeClass
+    public static void before() throws AnalysisException, IOException {
+        schema = new Schema(
+                Arrays.asList(
+                    new Field("c_int", FieldType.nullable(new 
ArrowType.Int(32, true)), null),
+                    new Field("c_long", FieldType.nullable(new 
ArrowType.Int(64, true)), null),
+                    new Field("c_bool", FieldType.nullable(new 
ArrowType.Bool()), null),
+                    new Field("c_float", FieldType.nullable(new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null),
+                    new Field("c_double", FieldType.nullable(new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null),
+                    new Field("c_dec", FieldType.nullable(new 
ArrowType.Decimal(20, 10)), null),
+                    new Field("c_date", FieldType.nullable(new 
ArrowType.Date(DateUnit.DAY)), null),
+                    new Field("c_ts", FieldType.nullable(new 
ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), null),
+                    new Field("c_str", FieldType.nullable(new 
ArrowType.Utf8()), null)
+                ));
+    }
+
+    @Test
+    public void testBinaryPredicate() throws AnalysisException, IOException {
+        List<LiteralExpr> literalList = new ArrayList<LiteralExpr>() {{
+                add(new BoolLiteral(true));
+                add(new DateLiteral("2023-01-02", Type.DATEV2));
+                add(new DateLiteral("2024-01-02 12:34:56.123456", 
Type.DATETIMEV2));
+                add(new DecimalLiteral(new BigDecimal("1.23")));
+                add(new FloatLiteral(1.23, Type.FLOAT));
+                add(new FloatLiteral(3.456, Type.DOUBLE));
+                add(new IntLiteral(1, Type.TINYINT));
+                add(new IntLiteral(1, Type.SMALLINT));
+                add(new IntLiteral(1, Type.INT));
+                add(new IntLiteral(1, Type.BIGINT));
+                add(new StringLiteral("abc"));
+                add(new StringLiteral("2023-01-02"));
+                add(new StringLiteral("2023-01-02 01:02:03.456789"));
+            }};
+
+        List<SlotRef> slotRefs = new ArrayList<SlotRef>() {{
+                add(new SlotRef(new TableName(), "c_int"));
+                add(new SlotRef(new TableName(), "c_long"));
+                add(new SlotRef(new TableName(), "c_bool"));
+                add(new SlotRef(new TableName(), "c_float"));
+                add(new SlotRef(new TableName(), "c_double"));
+                add(new SlotRef(new TableName(), "c_dec"));
+                add(new SlotRef(new TableName(), "c_date"));
+                add(new SlotRef(new TableName(), "c_ts"));
+                add(new SlotRef(new TableName(), "c_str"));
+            }};
+
+        // true indicates support for pushdown
+        Boolean[][] expects = new Boolean[][] {
+                { // int
+                        false, false, false, false, false, false, true, true, 
true, true, false, false, false
+                },
+                { // long
+                        false, false, false, false, false, false, true, true, 
true, true, false, false, false
+                },
+                { // boolean
+                        true, false, false, false, false, false, false, false, 
false, false, false, false, false
+                },
+                { // float
+                        false, false, false, false, true, false, true, true, 
true, true, false, false, false
+                },
+                { // double
+                        false, false, false, true, true, true, true, true, 
true, true, false, false, false
+                },
+                { // decimal
+                        false, false, false, true, true, true, true, true, 
true, true, false, false, false
+                },
+                { // date
+                        false, true, false, false, false, false, true, true, 
true, true, false, true, false
+                },
+                { // timestamp
+                        false, true, true, false, false, false, false, false, 
false, true, false, false, false
+                },
+                { // string
+                        true, true, true, true, false, false, false, false, 
false, false, true, true, true
+                }
+        };
+
+        ArrayListMultimap<Boolean, Expr> validPredicateMap = 
ArrayListMultimap.create();
+
+        // binary predicate
+        for (int i = 0; i < slotRefs.size(); i++) {
+            final int loc = i;
+            List<Boolean> ret = literalList.stream().map(literal -> {
+                BinaryPredicate expr = new 
BinaryPredicate(BinaryPredicate.Operator.EQ, slotRefs.get(loc), literal);
+                Expression expression = null;
+                try {
+                    expression = LakeSoulUtils.convertToSubstraitExpr(expr, 
schema);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                validPredicateMap.put(expression != null, expr);
+                return expression != null;
+            }).collect(Collectors.toList());
+            Assert.assertArrayEquals(expects[i], ret.toArray());
+        }
+
+        // in predicate
+        for (int i = 0; i < slotRefs.size(); i++) {
+            final int loc = i;
+            List<Boolean> ret = literalList.stream().map(literal -> {
+                InPredicate expr = new InPredicate(slotRefs.get(loc), 
Lists.newArrayList(literal), false);
+                Expression expression = null;
+                try {
+                    expression = LakeSoulUtils.convertToSubstraitExpr(expr, 
schema);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                validPredicateMap.put(expression != null, expr);
+                return expression != null;
+            }).collect(Collectors.toList());
+            Assert.assertArrayEquals(expects[i], ret.toArray());
+        }
+
+        // not in predicate
+        for (int i = 0; i < slotRefs.size(); i++) {
+            final int loc = i;
+            List<Boolean> ret = literalList.stream().map(literal -> {
+                InPredicate expr = new InPredicate(slotRefs.get(loc), 
Lists.newArrayList(literal), true);
+                Expression expression = null;
+                try {
+                    expression = LakeSoulUtils.convertToSubstraitExpr(expr, 
schema);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                validPredicateMap.put(expression != null, expr);
+                return expression != null;
+            }).collect(Collectors.toList());
+            Assert.assertArrayEquals(expects[i], ret.toArray());
+        }
+
+        // bool literal
+
+        Expression trueExpr = LakeSoulUtils.convertToSubstraitExpr(new 
BoolLiteral(true), schema);
+        Expression falseExpr = LakeSoulUtils.convertToSubstraitExpr(new 
BoolLiteral(false), schema);
+        Assert.assertEquals(SubstraitUtil.CONST_TRUE, trueExpr);
+        Assert.assertEquals(SubstraitUtil.CONST_FALSE, falseExpr);
+        validPredicateMap.put(true, new BoolLiteral(true));
+        validPredicateMap.put(true, new BoolLiteral(false));
+
+        List<Expr> validExprs = validPredicateMap.get(true);
+        List<Expr> invalidExprs = validPredicateMap.get(false);
+        // OR predicate
+        // both valid
+        for (int i = 0; i < validExprs.size(); i++) {
+            for (int j = 0; j < validExprs.size(); j++) {
+                CompoundPredicate orPredicate = new 
CompoundPredicate(Operator.OR,
+                        validExprs.get(i), validExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+                Assert.assertNotNull("pred: " + orPredicate.toSql(), 
expression);
+            }
+        }
+        // both invalid
+        for (int i = 0; i < invalidExprs.size(); i++) {
+            for (int j = 0; j < invalidExprs.size(); j++) {
+                CompoundPredicate orPredicate = new 
CompoundPredicate(Operator.OR,
+                        invalidExprs.get(i), invalidExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+                Assert.assertNull("pred: " + orPredicate.toSql(), expression);
+            }
+        }
+        // valid or invalid
+        for (int i = 0; i < validExprs.size(); i++) {
+            for (int j = 0; j < invalidExprs.size(); j++) {
+                CompoundPredicate orPredicate = new 
CompoundPredicate(Operator.OR,
+                        validExprs.get(i), invalidExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+                Assert.assertNull("pred: " + orPredicate.toSql(), expression);
+            }
+        }
+
+        // AND predicate
+        // both valid
+        for (int i = 0; i < validExprs.size(); i++) {
+            for (int j = 0; j < validExprs.size(); j++) {
+                CompoundPredicate andPredicate = new 
CompoundPredicate(Operator.AND,
+                        validExprs.get(i), validExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+                Assert.assertNotNull("pred: " + andPredicate.toSql(), 
expression);
+            }
+        }
+        // both invalid
+        for (int i = 0; i < invalidExprs.size(); i++) {
+            for (int j = 0; j < invalidExprs.size(); j++) {
+                CompoundPredicate andPredicate = new 
CompoundPredicate(Operator.AND,
+                        invalidExprs.get(i), invalidExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+                Assert.assertNull("pred: " + andPredicate.toSql(), expression);
+            }
+        }
+        // valid and invalid
+        for (int i = 0; i < validExprs.size(); i++) {
+            for (int j = 0; j < invalidExprs.size(); j++) {
+                CompoundPredicate andPredicate = new 
CompoundPredicate(Operator.AND,
+                        validExprs.get(i), invalidExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+                Assert.assertNotNull("pred: " + andPredicate.toSql(), 
expression);
+                
Assert.assertEquals(SubstraitUtil.substraitExprToProto(LakeSoulUtils.convertToSubstraitExpr(validExprs.get(i),
 schema), "table"),
+                        SubstraitUtil.substraitExprToProto(expression, 
"table"));
+            }
+        }
+
+        // NOT predicate
+        // valid
+        for (int i = 0; i < validExprs.size(); i++) {
+            CompoundPredicate notPredicate = new 
CompoundPredicate(Operator.NOT,
+                    validExprs.get(i), null);
+            Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema);
+            Assert.assertNotNull("pred: " + notPredicate.toSql(), expression);
+        }
+        // invalid
+        for (int i = 0; i < invalidExprs.size(); i++) {
+            CompoundPredicate notPredicate = new 
CompoundPredicate(Operator.NOT,
+                    invalidExprs.get(i), null);
+            Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema);
+            Assert.assertNull("pred: " + notPredicate.toSql(), expression);
+        }
+    }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index 0c08bb327ae..3d43297fe21 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -317,6 +317,8 @@ under the License.
         <hudi.version>0.14.1</hudi.version>
         <presto.hadoop.version>2.7.4-11</presto.hadoop.version>
         <presto.hive.version>3.0.0-8</presto.hive.version>
+        <!-- lakesoul -->
+        <lakesoul.version>2.6.1</lakesoul.version>
 
         <parquet.version>1.13.1</parquet.version>
         <commons-collections.version>3.2.2</commons-collections.version>
@@ -1831,4 +1833,4 @@ under the License.
             </snapshots>
         </repository>
     </repositories>
-</project>
\ No newline at end of file
+</project>
diff --git a/regression-test/conf/regression-conf.groovy 
b/regression-test/conf/regression-conf.groovy
index 527b0231394..f4fb41fdf55 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -224,3 +224,14 @@ enableTrinoConnectorTest = false
 enableKerberosTest=false
 kerberosHmsPort=9883
 kerberosHdfsPort=8820
+
+
+// LakeSoul catalog test config
+enableLakesoulTest = false
+lakesoulPGUser="*******"
+lakesoulPGPwd="*******"
+lakesoulPGUrl="*******"
+lakesoulMinioAK="*******"
+lakesoulMinioSK="*******"
+lakesoulMinioEndpoint="*******"
+
diff --git 
a/regression-test/data/external_table_p0/lakesoul/test_lakesoul_filter.out 
b/regression-test/data/external_table_p0/lakesoul/test_lakesoul_filter.out
new file mode 100644
index 00000000000..cb1899326bd
--- /dev/null
+++ b/regression-test/data/external_table_p0/lakesoul/test_lakesoul_filter.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !lakesoul --
+0      AFRICA  lar deposits. blithely final packages cajole. regular waters 
are final requests. regular accounts are according to 
+2      ASIA    ges. thinly even pinto beans ca
+3      EUROPE  ly final courts cajole furiously final excuse
+1      AMERICA hs use ironic, even requests. s
+4      MIDDLE EAST     uickly special accounts cajole carefully blithely close 
requests. carefully final asymptotes haggle furiousl
+
diff --git a/regression-test/pipeline/external/conf/regression-conf.groovy 
b/regression-test/pipeline/external/conf/regression-conf.groovy
index 28956568e58..0299066fa02 100644
--- a/regression-test/pipeline/external/conf/regression-conf.groovy
+++ b/regression-test/pipeline/external/conf/regression-conf.groovy
@@ -163,3 +163,7 @@ enableTrinoConnectorTest = true
 enableKerberosTest = true
 kerberosHmsPort=9883
 kerberosHdfsPort=8820
+
+
+// LakeSoul catalog test config
+enableLakesoulTest = true
diff --git 
a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
 
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
index e0b8a924c30..ffd95d93097 100644
--- 
a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
+++ 
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
@@ -16,20 +16,33 @@
 // under the License.
 
 suite("test_lakesoul_catalog", 
"p0,external,doris,external_docker,external_docker_doris") {
-    def enabled = false;
+    String enabled = context.config.otherConfigs.get("enableLakesoulTest")
     // open it when docker image is ready to run in regression test
-    if (enabled) {
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String catalog_name = "lakesoul"
         String db_name = "default"
+        String pg_user = context.config.otherConfigs.get("lakesoulPGUser")
+        String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd")
+        String pg_url = context.config.otherConfigs.get("lakesoulPGUrl")
+        String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK")
+        String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK")
+        String minio_endpoint = 
context.config.otherConfigs.get("lakesoulMinioEndpoint")
 
         sql """drop catalog if exists ${catalog_name}"""
-        sql """
-            create catalog lakesoul  properties 
('type'='lakesoul','lakesoul.pg.username'='lakesoul_test','lakesoul.pg.password'='lakesoul_test','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified');"""
+        sql """create catalog lakesoul  properties (
+            'type'='lakesoul',
+            'lakesoul.pg.username'='${pg_user}',
+            'lakesoul.pg.password'='${pg_pwd}',
+            'lakesoul.pg.url'='${pg_url}',
+            'minio.endpoint'='${minio_endpoint}',
+            'minio.access_key'='${minio_ak}',
+            'minio.secret_key'='${minio_sk}'
+            );"""
 
         // analyze
         sql """use `${catalog_name}`.`${db_name}`"""
 
- sq     """show tables;"""
+        sql """show tables;"""
         // select
         sql  """select * from nation;"""
 
diff --git 
a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy 
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
new file mode 100644
index 00000000000..799e8ba61bb
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_lakesoul_filter", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableLakesoulTest")
+    // open it when docker image is ready to run in regression test
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String catalog_name = "lakesoul"
+        String db_name = "default"
+        String pg_user = context.config.otherConfigs.get("lakesoulPGUser")
+        String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd")
+        String pg_url = context.config.otherConfigs.get("lakesoulPGUrl")
+        String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK")
+        String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK")
+        String minio_endpoint = 
context.config.otherConfigs.get("lakesoulMinioEndpoint")
+
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """create catalog lakesoul  properties (
+            'type'='lakesoul',
+            'lakesoul.pg.username'='${pg_user}',
+            'lakesoul.pg.password'='${pg_pwd}',
+            'lakesoul.pg.url'='${pg_url}',
+            'minio.endpoint'='${minio_endpoint}',
+            'minio.access_key'='${minio_ak}',
+            'minio.secret_key'='${minio_sk}'
+            );"""
+
+        // analyze
+        sql """use `${catalog_name}`.`${db_name}`"""
+
+        sql """show tables;"""
+        // select
+        sql """select * from region;"""
+
+        sql """select * from nation;"""
+
+        sql """select * from nation where n_regionkey = 0 or n_nationkey > 
14;"""
+
+        sql """select * from nation where n_regionkey = 0 and n_nationkey > 
0;"""
+
+        sql """select * from nation where n_regionkey = 0;"""
+    }
+}
+
diff --git 
a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
 
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
index 9369a28e8fe..bb85dc687d7 100644
--- 
a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
+++ 
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
@@ -34,13 +34,25 @@ suite("test_external_table_lakesoul", 
"p2,external,lakesoul,external_remote,exte
 
 
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
-            String catalog_name = "lakesoul"
-            String db_name = "default"
+        String catalog_name = "lakesoul"
+        String db_name = "default"
+        String pg_user = context.config.otherConfigs.get("lakesoulPGUser")
+        String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd")
+        String pg_url = context.config.otherConfigs.get("lakesoulPGUrl")
+        String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK")
+        String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK")
+        String minio_endpoint = 
context.config.otherConfigs.get("lakesoulMinioEndpoint")
 
-            sql """drop catalog if exists ${catalog_name}"""
-            sql """
-                create catalog lakesoul  properties 
('type'='lakesoul','lakesoul.pg.username'='lakesoul','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5433/lakesoul_test?stringtype=unspecified');
-                """
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """create catalog lakesoul  properties (
+            'type'='lakesoul',
+            'lakesoul.pg.username'='${pg_user}',
+            'lakesoul.pg.password'='${pg_pwd}',
+            'lakesoul.pg.url'='${pg_url}',
+            'minio.endpoint'='${minio_endpoint}',
+            'minio.access_key'='${minio_ak}',
+            'minio.secret_key'='${minio_sk}'
+            );"""
 
             // analyze
             sql """use `${catalog_name}`.`${db_name}`""" 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to