This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 719af36f138 [Cherry-Pick][Fix](lakesoul) Fix CVEs and some filter 
pushdown issues (#39044) (#40812)
719af36f138 is described below

commit 719af36f1384892ae25de362c83ffac162569e60
Author: Ceng <441651...@qq.com>
AuthorDate: Mon Sep 23 14:09:36 2024 +0800

    [Cherry-Pick][Fix](lakesoul) Fix CVEs and some filter pushdown issues 
(#39044) (#40812)
    
    bp #39044
    
    Signed-off-by: dmetasoul01 <opensou...@dmetasoul.com>
    Co-authored-by: Xu Chen <xuchen-p...@users.noreply.github.com>
    Co-authored-by: dmetasoul01 <opensou...@dmetasoul.com>
---
 fe/be-java-extensions/lakesoul-scanner/pom.xml     |  74 +--
 .../apache/doris/lakesoul/LakeSoulJniScanner.java  |  61 ++-
 .../org/apache/doris/lakesoul/LakeSoulUtils.java   |  30 +-
 .../apache/doris/lakesoul/arrow/ArrowUtils.java    |  24 +-
 .../lakesoul/arrow/LakeSoulArrowJniScanner.java    |  58 ++-
 .../doris/lakesoul/parquet/ParquetFilter.java      | 288 -----------
 fe/fe-core/pom.xml                                 |  69 ++-
 .../lakesoul/LakeSoulExternalCatalog.java          |  48 +-
 .../datasource/lakesoul/LakeSoulExternalTable.java |  26 +-
 .../doris/datasource/lakesoul/LakeSoulUtils.java   | 526 +++++++++++++++++++++
 .../lakesoul/source/LakeSoulScanNode.java          | 170 ++++++-
 .../datasource/lakesoul/LakeSoulPredicateTest.java | 280 +++++++++++
 fe/pom.xml                                         |   4 +-
 .../lakesoul/test_lakesoul_filter.groovy           |  74 +++
 .../lakesoul/test_external_table_lakesoul.groovy   |  24 +-
 15 files changed, 1265 insertions(+), 491 deletions(-)

diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml 
b/fe/be-java-extensions/lakesoul-scanner/pom.xml
index cbbb473483e..4a1ae6b2e8a 100644
--- a/fe/be-java-extensions/lakesoul-scanner/pom.xml
+++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml
@@ -47,87 +47,19 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.arrow</groupId>
-            <artifactId>arrow-vector</artifactId>
-            <version>${arrow.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.arrow</groupId>
-            <artifactId>arrow-memory-unsafe</artifactId>
-            <version>${arrow.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.arrow</groupId>
-            <artifactId>arrow-c-data</artifactId>
-            <version>${arrow.version}</version>
-        </dependency>
-
-        <!-- scala deps -->
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-library</artifactId>
-            <version>${scala.version}</version>
-            <scope>compile</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-compiler</artifactId>
-            <version>${scala.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-reflect</artifactId>
-            <version>${scala.version}</version>
-        </dependency>
 
         <dependency>
             <groupId>com.dmetasoul</groupId>
             <artifactId>lakesoul-io-java</artifactId>
-            <version>2.5.4</version>
+            <version>${lakesoul.version}</version>
+            <classifier>shaded</classifier>
             <exclusions>
                 <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.arrow</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.antlr</groupId>
-                    <artifactId>antlr4-runtime</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.code.findbugs</groupId>
-                    <artifactId>jsr305</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.spark</groupId>
+                    <groupId>*</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.parquet</groupId>
-            <artifactId>parquet-column</artifactId>
-            <version>1.12.2</version>
-        </dependency>
-
     </dependencies>
 
     <build>
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
index 3dfbff756db..bedef57d3b7 100644
--- 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
@@ -17,25 +17,30 @@
 
 package org.apache.doris.lakesoul;
 
-import org.apache.doris.common.jni.vec.ScanPredicate;
 import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner;
-import org.apache.doris.lakesoul.parquet.ParquetFilter;
 
 import com.dmetasoul.lakesoul.LakeSoulArrowReader;
 import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.types.pojo.Schema;
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference;
+import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+import com.lakesoul.shaded.io.substrait.proto.Plan;
+import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import com.lakesoul.shaded.org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 public class LakeSoulJniScanner extends LakeSoulArrowJniScanner {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LakeSoulJniScanner.class);
 
     private final Map<String, String> params;
 
@@ -60,6 +65,8 @@ public class LakeSoulJniScanner extends 
LakeSoulArrowJniScanner {
         withAllocator(nativeIOReader.getAllocator());
         nativeIOReader.setBatchSize(batchSize);
 
+        LOG.info("opening LakeSoulJniScanner with params={}", params);
+
         // add files
         for (String file : 
params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) {
             nativeIOReader.addFile(file);
@@ -72,19 +79,43 @@ public class LakeSoulJniScanner extends 
LakeSoulArrowJniScanner {
                     
Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList()));
         }
 
-        Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
+        String options = params.getOrDefault(LakeSoulUtils.OPTIONS, "{}");
+        Map<String, String> optionsMap = new ObjectMapper().readValue(
+                options, new TypeReference<Map<String, String>>() {}
+        );
+        String base64Predicate = 
optionsMap.get(LakeSoulUtils.SUBSTRAIT_PREDICATE);
+        if (base64Predicate != null) {
+            Plan predicate = SubstraitUtil.decodeBase64String(base64Predicate);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("push predicate={}", predicate);
+            }
+            nativeIOReader.addFilterProto(predicate);
+        }
+
+        for (String key : LakeSoulUtils.OBJECT_STORE_OPTIONS) {
+            String value = optionsMap.get(key);
+            if (key != null) {
+                nativeIOReader.setObjectStoreOption(key, value);
+            }
+        }
+
+        Schema tableSchema = 
Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
         String[] requiredFieldNames = 
params.get(LakeSoulUtils.REQUIRED_FIELDS).split(LakeSoulUtils.LIST_DELIM);
 
         List<Field> requiredFields = new ArrayList<>();
         for (String fieldName : requiredFieldNames) {
-            requiredFields.add(schema.findField(fieldName));
+            String name = fieldName;
+            if (StringUtils.isEmpty(name)) {
+                continue;
+            }
+            requiredFields.add(tableSchema.findField(fieldName));
         }
 
         requiredSchema = new Schema(requiredFields);
 
         nativeIOReader.setSchema(requiredSchema);
 
-        HashSet<String> partitionColumn = new HashSet<>();
+        List<Field> partitionFields = new ArrayList<>();
         for (String partitionKV : 
params.getOrDefault(LakeSoulUtils.PARTITION_DESC, "")
                 .split(LakeSoulUtils.LIST_DELIM)) {
             if (partitionKV.isEmpty()) {
@@ -94,17 +125,15 @@ public class LakeSoulJniScanner extends 
LakeSoulArrowJniScanner {
             if (kv.length != 2) {
                 throw new IllegalArgumentException("Invalid partition column = 
" + partitionKV);
             }
-            partitionColumn.add(kv[0]);
+            nativeIOReader.setDefaultColumnValue(kv[0], kv[1]);
+            partitionFields.add(tableSchema.findField(kv[0]));
+        }
+        if (!partitionFields.isEmpty()) {
+            nativeIOReader.setPartitionSchema(new Schema(partitionFields));
         }
 
         initTableInfo(params);
 
-        for (ScanPredicate predicate : predicates) {
-            if (!partitionColumn.contains(predicate.columName)) {
-                
nativeIOReader.addFilter(ParquetFilter.toParquetFilter(predicate).toString());
-            }
-        }
-
         nativeIOReader.initializeReader();
         lakesoulArrowReader = new LakeSoulArrowReader(nativeIOReader, 
awaitTimeout);
     }
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
index 6c7f88f3ab3..ca07a81d0da 100644
--- 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
@@ -17,13 +17,29 @@
 
 package org.apache.doris.lakesoul;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class LakeSoulUtils {
-    public static String FILE_NAMES = "file_paths";
-    public static String PRIMARY_KEYS = "primary_keys";
-    public static String SCHEMA_JSON = "table_schema";
-    public static String PARTITION_DESC = "partition_descs";
-    public static String REQUIRED_FIELDS = "required_fields";
+    public static final String FILE_NAMES = "file_paths";
+    public static final String PRIMARY_KEYS = "primary_keys";
+    public static final String SCHEMA_JSON = "table_schema";
+    public static final String PARTITION_DESC = "partition_descs";
+    public static final String REQUIRED_FIELDS = "required_fields";
+    public static final String OPTIONS = "options";
+    public static final String SUBSTRAIT_PREDICATE = "substrait_predicate";
+    public static final String LIST_DELIM = ";";
+    public static final String PARTITIONS_KV_DELIM = "=";
+
+    public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+    public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+    public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+    public static final String FS_S3A_PATH_STYLE_ACCESS = 
"fs.s3a.path.style.access";
 
-    public static String LIST_DELIM = ";";
-    public static String PARTITIONS_KV_DELIM = "=";
+    public static final List<String> OBJECT_STORE_OPTIONS = Arrays.asList(
+            FS_S3A_ACCESS_KEY,
+            FS_S3A_SECRET_KEY,
+            FS_S3A_ENDPOINT,
+            FS_S3A_PATH_STYLE_ACCESS
+    );
 }
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
index 3ad28ba783a..a3f9bc4788d 100644
--- 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
@@ -20,10 +20,10 @@ package org.apache.doris.lakesoul.arrow;
 import org.apache.doris.common.jni.utils.OffHeap;
 import org.apache.doris.common.jni.utils.TypeNativeBytes;
 
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.util.Preconditions;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-import org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf;
+import com.lakesoul.shaded.org.apache.arrow.util.Preconditions;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.ArrowType;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
 
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -40,7 +40,7 @@ public class ArrowUtils {
             LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, 0, 
ZoneOffset.UTC);
             OffHeap.putLong(null, address + offset,
                     TypeNativeBytes.convertToDateTimeV2(v.getYear(), 
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
-                            v.getMinute(), v.getSecond(), v.getNano() / 1000));
+                    v.getMinute(), v.getSecond(), v.getNano() / 1000));
             offset += 8;
 
         }
@@ -58,7 +58,7 @@ public class ArrowUtils {
             LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) 
nanoSec, ZoneOffset.UTC);
             OffHeap.putLong(null, address + offset,
                     TypeNativeBytes.convertToDateTimeV2(v.getYear(), 
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
-                            v.getMinute(), v.getSecond(), v.getNano() / 1000));
+                    v.getMinute(), v.getSecond(), v.getNano() / 1000));
             offset += 8;
 
         }
@@ -76,7 +76,7 @@ public class ArrowUtils {
             LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) 
nanoSec, ZoneOffset.UTC);
             OffHeap.putLong(null, address + offset,
                     TypeNativeBytes.convertToDateTimeV2(v.getYear(), 
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
-                            v.getMinute(), v.getSecond(), v.getNano() / 1000));
+                    v.getMinute(), v.getSecond(), v.getNano() / 1000));
             offset += 8;
 
         }
@@ -94,7 +94,7 @@ public class ArrowUtils {
             LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) 
nanoSec, ZoneOffset.UTC);
             OffHeap.putLong(null, address + offset,
                     TypeNativeBytes.convertToDateTimeV2(v.getYear(), 
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
-                            v.getMinute(), v.getSecond(), v.getNano() / 1000));
+                    v.getMinute(), v.getSecond(), v.getNano() / 1000));
             offset += 8;
 
         }
@@ -215,11 +215,9 @@ public class ArrowUtils {
         return hiveType.toString();
     }
 
-    private static class ArrowTypeToHiveTypeConverter
-            implements ArrowType.ArrowTypeVisitor<String> {
+    private static class ArrowTypeToHiveTypeConverter implements 
ArrowType.ArrowTypeVisitor<String> {
 
-        private static final ArrowTypeToHiveTypeConverter INSTANCE =
-                new ArrowTypeToHiveTypeConverter();
+        private static final ArrowTypeToHiveTypeConverter INSTANCE = new 
ArrowTypeToHiveTypeConverter();
 
         @Override
         public String visit(ArrowType.Null type) {
@@ -359,6 +357,4 @@ public class ArrowUtils {
             return "unsupported";
         }
     }
-
-
 }
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
index 320d653a20a..b8fb8e92bd4 100644
--- 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
@@ -23,28 +23,28 @@ import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ScanPredicate;
 import org.apache.doris.common.jni.vec.VectorTable;
 
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.TimeStampMicroTZVector;
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliTZVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampNanoTZVector;
-import org.apache.arrow.vector.TimeStampNanoVector;
-import org.apache.arrow.vector.TimeStampSecTZVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.VarCharVector;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.complex.ListVector;
-import org.apache.arrow.vector.complex.StructVector;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.types.pojo.Schema;
+import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf;
+import com.lakesoul.shaded.org.apache.arrow.memory.BufferAllocator;
+import com.lakesoul.shaded.org.apache.arrow.vector.BitVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.DateDayVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.DecimalVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.FieldVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.ValueVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.VarCharVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import com.lakesoul.shaded.org.apache.arrow.vector.complex.ListVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.complex.StructVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
@@ -101,7 +101,7 @@ public class LakeSoulArrowJniScanner extends JniScanner {
         String[] requiredFields = new String[fields.size()];
         for (int i = 0; i < fields.size(); i++) {
             columnTypes[i] =
-                    ColumnType.parseType(fields.get(i).getName(), 
ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
+                ColumnType.parseType(fields.get(i).getName(), 
ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
             requiredFields[i] = fields.get(i).getName();
         }
         predicates = new ScanPredicate[0];
@@ -116,8 +116,8 @@ public class LakeSoulArrowJniScanner extends JniScanner {
         String[] requiredFields = new String[fields.size()];
         for (int i = 0; i < fields.size(); i++) {
             columnTypes[i] =
-                    ColumnType.parseType(fields.get(i).getName(),
-                            ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
+                ColumnType.parseType(fields.get(i).getName(),
+                    ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
             requiredFields[i] = fields.get(i).getName();
         }
 
@@ -142,10 +142,8 @@ public class LakeSoulArrowJniScanner extends JniScanner {
     private Integer fillMetaAddressVector(int batchSize, ColumnType 
columnType, long metaAddress, Integer offset,
                                           ValueVector valueVector) {
         // nullMap
-        long
-                validityBuffer =
-                ArrowUtils.loadValidityBuffer(valueVector.getValidityBuffer(), 
batchSize,
-                        valueVector.getField().isNullable());
+        long validityBuffer = 
ArrowUtils.loadValidityBuffer(valueVector.getValidityBuffer(), batchSize,
+                valueVector.getField().isNullable());
         extraOffHeap.add(validityBuffer);
         OffHeap.putLong(null, metaAddress + (offset++) * 8, validityBuffer);
 
@@ -172,7 +170,7 @@ public class LakeSoulArrowJniScanner extends JniScanner {
                     continue;
                 }
                 offset = fillMetaAddressVector(batchSize, 
columnType.getChildTypes().get(i), metaAddress, offset,
-                        childrenVector);
+                    childrenVector);
             }
 
         } else if (columnType.isStringType()) {
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
deleted file mode 100644
index 7d2820acd79..00000000000
--- 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
+++ /dev/null
@@ -1,288 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.lakesoul.parquet;
-
-import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ScanPredicate;
-
-import org.apache.parquet.filter2.predicate.FilterApi;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
-import org.apache.parquet.io.api.Binary;
-
-public class ParquetFilter {
-
-    public static FilterPredicate toParquetFilter(ScanPredicate predicate) {
-        ScanPredicate.FilterOp filterOp = predicate.op;
-        switch (filterOp) {
-            case FILTER_IN:
-                return convertIn(predicate);
-            case FILTER_NOT_IN:
-                return convertNotIn(predicate);
-            case FILTER_LESS:
-                return convertLess(predicate);
-            case FILTER_LARGER:
-                return convertLarger(predicate);
-            case FILTER_LESS_OR_EQUAL:
-                return convertLessOrEqual(predicate);
-            case FILTER_LARGER_OR_EQUAL:
-                return convertLargerOrEqual(predicate);
-            default:
-                break;
-        }
-        throw new RuntimeException("Unsupported ScanPredicate" + 
ScanPredicate.dump(new ScanPredicate[] {predicate}));
-    }
-
-    private static FilterPredicate convertNotIn(ScanPredicate predicate) {
-        String colName = predicate.columName;
-        ColumnType.Type colType = predicate.type;
-        ScanPredicate.PredicateValue[] predicateValues = 
predicate.predicateValues();
-        FilterPredicate resultPredicate = null;
-        for (ScanPredicate.PredicateValue predicateValue : predicateValues) {
-            if (resultPredicate == null) {
-                resultPredicate = makeNotEquals(colName, colType, 
predicateValue);
-            } else {
-                resultPredicate = FilterApi.and(resultPredicate, 
makeNotEquals(colName, colType, predicateValue));
-            }
-        }
-        return resultPredicate;
-    }
-
-    private static FilterPredicate convertIn(ScanPredicate predicate) {
-        String colName = predicate.columName;
-        ColumnType.Type colType = predicate.type;
-        ScanPredicate.PredicateValue[] predicateValues = 
predicate.predicateValues();
-        FilterPredicate resultPredicate = null;
-        for (ScanPredicate.PredicateValue predicateValue : predicateValues) {
-            if (resultPredicate == null) {
-                resultPredicate = makeEquals(colName, colType, predicateValue);
-            } else {
-                resultPredicate = FilterApi.or(resultPredicate, 
makeEquals(colName, colType, predicateValue));
-            }
-        }
-        return resultPredicate;
-    }
-
-    private static FilterPredicate convertLarger(ScanPredicate predicate) {
-        String colName = predicate.columName;
-        ColumnType.Type colType = predicate.type;
-        ScanPredicate.PredicateValue predicateValue = 
predicate.predicateValues()[0];
-        return makeLarger(colName, colType, predicateValue);
-    }
-
-    private static FilterPredicate convertLargerOrEqual(ScanPredicate 
predicate) {
-        String colName = predicate.columName;
-        ColumnType.Type colType = predicate.type;
-        ScanPredicate.PredicateValue predicateValue = 
predicate.predicateValues()[0];
-        return makeLargerOrEqual(colName, colType, predicateValue);
-    }
-
-    private static FilterPredicate convertLess(ScanPredicate predicate) {
-        String colName = predicate.columName;
-        ColumnType.Type colType = predicate.type;
-        ScanPredicate.PredicateValue predicateValue = 
predicate.predicateValues()[0];
-        return makeLess(colName, colType, predicateValue);
-    }
-
-    private static FilterPredicate convertLessOrEqual(ScanPredicate predicate) 
{
-        String colName = predicate.columName;
-        ColumnType.Type colType = predicate.type;
-        ScanPredicate.PredicateValue predicateValue = 
predicate.predicateValues()[0];
-        return makeLessOrEqual(colName, colType, predicateValue);
-    }
-
-    private static FilterPredicate makeNotEquals(String colName, 
ColumnType.Type type,
-                                                 ScanPredicate.PredicateValue 
value) {
-        switch (type) {
-            case BOOLEAN:
-                return FilterApi.notEq(FilterApi.booleanColumn(colName), 
value.getBoolean());
-            case TINYINT:
-                return FilterApi.notEq(FilterApi.intColumn(colName), (int) 
value.getByte());
-            case SMALLINT:
-                return FilterApi.notEq(FilterApi.intColumn(colName), (int) 
value.getShort());
-            case INT:
-                return FilterApi.notEq(FilterApi.intColumn(colName), 
value.getInt());
-            case BIGINT:
-                return FilterApi.notEq(FilterApi.longColumn(colName), 
value.getLong());
-            case FLOAT:
-                return FilterApi.notEq(FilterApi.floatColumn(colName), 
value.getFloat());
-            case DOUBLE:
-                return FilterApi.notEq(FilterApi.doubleColumn(colName), 
value.getDouble());
-            case CHAR:
-            case VARCHAR:
-            case STRING:
-                return FilterApi.notEq(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
-            case BINARY:
-                return FilterApi.notEq(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
-            case ARRAY:
-            case MAP:
-            case STRUCT:
-            default:
-                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
-        }
-    }
-
-
-    private static FilterPredicate makeEquals(String colName, ColumnType.Type 
type,
-                                              ScanPredicate.PredicateValue 
value) {
-        switch (type) {
-            case BOOLEAN:
-                return FilterApi.eq(FilterApi.booleanColumn(colName), 
value.getBoolean());
-            case TINYINT:
-                return FilterApi.eq(FilterApi.intColumn(colName), (int) 
value.getByte());
-            case SMALLINT:
-                return FilterApi.eq(FilterApi.intColumn(colName), (int) 
value.getShort());
-            case INT:
-                return FilterApi.eq(FilterApi.intColumn(colName), 
value.getInt());
-            case BIGINT:
-                return FilterApi.eq(FilterApi.longColumn(colName), 
value.getLong());
-            case FLOAT:
-                return FilterApi.eq(FilterApi.floatColumn(colName), 
value.getFloat());
-            case DOUBLE:
-                return FilterApi.eq(FilterApi.doubleColumn(colName), 
value.getDouble());
-            case CHAR:
-            case VARCHAR:
-            case STRING:
-                return FilterApi.eq(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
-            case BINARY:
-                return FilterApi.eq(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
-            case ARRAY:
-            case MAP:
-            case STRUCT:
-            default:
-                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
-        }
-    }
-
-    private static FilterPredicate makeLarger(String colName, ColumnType.Type 
type,
-                                              ScanPredicate.PredicateValue 
value) {
-        switch (type) {
-            case TINYINT:
-                return FilterApi.gt(FilterApi.intColumn(colName), (int) 
value.getByte());
-            case SMALLINT:
-                return FilterApi.gt(FilterApi.intColumn(colName), (int) 
value.getShort());
-            case INT:
-                return FilterApi.gt(FilterApi.intColumn(colName), 
value.getInt());
-            case BIGINT:
-                return FilterApi.gt(FilterApi.longColumn(colName), 
value.getLong());
-            case FLOAT:
-                return FilterApi.gt(FilterApi.floatColumn(colName), 
value.getFloat());
-            case DOUBLE:
-                return FilterApi.gt(FilterApi.doubleColumn(colName), 
value.getDouble());
-            case CHAR:
-            case VARCHAR:
-            case STRING:
-                return FilterApi.gt(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
-            case BINARY:
-                return FilterApi.gt(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
-            case ARRAY:
-            case MAP:
-            case STRUCT:
-            default:
-                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
-        }
-
-    }
-
-    private static FilterPredicate makeLargerOrEqual(String colName, 
ColumnType.Type type,
-                                                     
ScanPredicate.PredicateValue value) {
-        switch (type) {
-            case TINYINT:
-                return FilterApi.gtEq(FilterApi.intColumn(colName), (int) 
value.getByte());
-            case SMALLINT:
-                return FilterApi.gtEq(FilterApi.intColumn(colName), (int) 
value.getShort());
-            case INT:
-                return FilterApi.gtEq(FilterApi.intColumn(colName), 
value.getInt());
-            case BIGINT:
-                return FilterApi.gtEq(FilterApi.longColumn(colName), 
value.getLong());
-            case FLOAT:
-                return FilterApi.gtEq(FilterApi.floatColumn(colName), 
value.getFloat());
-            case DOUBLE:
-                return FilterApi.gtEq(FilterApi.doubleColumn(colName), 
value.getDouble());
-            case CHAR:
-            case VARCHAR:
-            case STRING:
-                return FilterApi.gtEq(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
-            case BINARY:
-                return FilterApi.gtEq(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
-            case ARRAY:
-            case MAP:
-            case STRUCT:
-            default:
-                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
-        }
-
-    }
-
-    private static FilterPredicate makeLess(String colName, ColumnType.Type 
type, ScanPredicate.PredicateValue value) {
-        switch (type) {
-            case TINYINT:
-                return FilterApi.lt(FilterApi.intColumn(colName), (int) 
value.getByte());
-            case SMALLINT:
-                return FilterApi.lt(FilterApi.intColumn(colName), (int) 
value.getShort());
-            case INT:
-                return FilterApi.lt(FilterApi.intColumn(colName), 
value.getInt());
-            case BIGINT:
-                return FilterApi.lt(FilterApi.longColumn(colName), 
value.getLong());
-            case FLOAT:
-                return FilterApi.lt(FilterApi.floatColumn(colName), 
value.getFloat());
-            case DOUBLE:
-                return FilterApi.lt(FilterApi.doubleColumn(colName), 
value.getDouble());
-            case CHAR:
-            case VARCHAR:
-            case STRING:
-                return FilterApi.lt(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
-            case BINARY:
-                return FilterApi.lt(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
-            case ARRAY:
-            case MAP:
-            case STRUCT:
-            default:
-                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
-        }
-
-    }
-
-    private static FilterPredicate makeLessOrEqual(String colName, 
ColumnType.Type type,
-                                                   
ScanPredicate.PredicateValue value) {
-        switch (type) {
-            case TINYINT:
-                return FilterApi.ltEq(FilterApi.intColumn(colName), (int) 
value.getByte());
-            case SMALLINT:
-                return FilterApi.ltEq(FilterApi.intColumn(colName), (int) 
value.getShort());
-            case INT:
-                return FilterApi.ltEq(FilterApi.intColumn(colName), 
value.getInt());
-            case BIGINT:
-                return FilterApi.ltEq(FilterApi.longColumn(colName), 
value.getLong());
-            case FLOAT:
-                return FilterApi.ltEq(FilterApi.floatColumn(colName), 
value.getFloat());
-            case DOUBLE:
-                return FilterApi.ltEq(FilterApi.doubleColumn(colName), 
value.getDouble());
-            case CHAR:
-            case VARCHAR:
-            case STRING:
-                return FilterApi.ltEq(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
-            case BINARY:
-                return FilterApi.ltEq(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
-            case ARRAY:
-            case MAP:
-            case STRUCT:
-            default:
-                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
-        }
-    }
-}
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index bac2346185d..92e576fb2e1 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -565,18 +565,77 @@ under the License.
             <artifactId>hadoop-auth</artifactId>
         </dependency>
 
+        <!-- lakesoul -->
         <dependency>
             <groupId>com.dmetasoul</groupId>
-            <artifactId>lakesoul-common</artifactId>
-            <version>2.5.4</version>
-            <classifier>shaded</classifier>
+            <artifactId>lakesoul-io-java</artifactId>
+            <version>${lakesoul.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>*</groupId>
+                    <groupId>io.netty</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>arrow-vector</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>arrow-memory-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>arrow-memory-netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>arrow-format</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.datatype</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.json4s</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.ow2.asm</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.postgresql</groupId>
+                    <artifactId>postgresql</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.7.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>compile</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.iceberg</groupId>
@@ -1220,4 +1279,4 @@ under the License.
             </extension>
         </extensions>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
index dd8342ad660..b27b4c6fc0d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
@@ -25,15 +25,19 @@ import 
org.apache.doris.datasource.property.PropertyConverter;
 
 import com.dmetasoul.lakesoul.meta.DBManager;
 import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
 import com.dmetasoul.lakesoul.meta.entity.TableInfo;
-import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 
 public class LakeSoulExternalCatalog extends ExternalCatalog {
 
-    private DBManager dbManager;
+    private static final Logger LOG = 
LogManager.getLogger(LakeSoulExternalCatalog.class);
+
+    private DBManager lakesoulMetadataManager;
 
     private final Map<String, String> props;
 
@@ -48,49 +52,47 @@ public class LakeSoulExternalCatalog extends 
ExternalCatalog {
     @Override
     protected List<String> listDatabaseNames() {
         initLocalObjectsImpl();
-        return dbManager.listNamespaces();
+        return lakesoulMetadataManager.listNamespaces();
     }
 
     @Override
     public List<String> listTableNames(SessionContext ctx, String dbName) {
         makeSureInitialized();
-        List<TableInfo> tifs = dbManager.getTableInfosByNamespace(dbName);
-        List<String> tableNames = Lists.newArrayList();
-        for (TableInfo item : tifs) {
-            tableNames.add(item.getTableName());
-        }
+        List<String> tableNames = 
lakesoulMetadataManager.listTableNamesByNamespace(dbName);
         return tableNames;
     }
 
     @Override
     public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
         makeSureInitialized();
-        TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(dbName, 
tblName);
-
+        TableInfo tableInfo = 
lakesoulMetadataManager.getTableInfoByNameAndNamespace(tblName, dbName);
         return null != tableInfo;
     }
 
     @Override
     protected void initLocalObjectsImpl() {
-        if (dbManager == null) {
-            if (props != null) {
-                if (props.containsKey(DBUtil.urlKey)) {
-                    System.setProperty(DBUtil.urlKey, 
props.get(DBUtil.urlKey));
-                }
-                if (props.containsKey(DBUtil.usernameKey)) {
-                    System.setProperty(DBUtil.usernameKey, 
props.get(DBUtil.usernameKey));
-                }
-                if (props.containsKey(DBUtil.passwordKey)) {
-                    System.setProperty(DBUtil.passwordKey, 
props.get(DBUtil.passwordKey));
-                }
+        if (props != null) {
+            if (props.containsKey(DBUtil.urlKey)) {
+                System.setProperty(DBUtil.urlKey, props.get(DBUtil.urlKey));
+            }
+            if (props.containsKey(DBUtil.usernameKey)) {
+                System.setProperty(DBUtil.usernameKey, 
props.get(DBUtil.usernameKey));
+            }
+            if (props.containsKey(DBUtil.passwordKey)) {
+                System.setProperty(DBUtil.passwordKey, 
props.get(DBUtil.passwordKey));
             }
-            dbManager = new DBManager();
         }
+        lakesoulMetadataManager = new DBManager();
     }
 
     public TableInfo getLakeSoulTable(String dbName, String tblName) {
         makeSureInitialized();
-        return dbManager.getTableInfoByNameAndNamespace(tblName, dbName);
+        return lakesoulMetadataManager.getTableInfoByNameAndNamespace(tblName, 
dbName);
+    }
+
+    public List<PartitionInfo> listPartitionInfo(String tableId) {
+        makeSureInitialized();
+        return lakesoulMetadataManager.getAllPartitionInfo(tableId);
     }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
index 46e8d1db47c..9dd2f4811e9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
@@ -25,17 +25,23 @@ import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.ExternalAnalysisTask;
 import org.apache.doris.thrift.TLakeSoulTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
 import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
 import com.dmetasoul.lakesoul.meta.entity.TableInfo;
 import com.google.common.collect.Lists;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -45,11 +51,24 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class LakeSoulExternalTable extends ExternalTable {
-
+    private static final Logger LOG = 
LogManager.getLogger(LakeSoulExternalTable.class);
     public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6;
 
+    public final String tableId;
+
     public LakeSoulExternalTable(long id, String name, String dbName, 
LakeSoulExternalCatalog catalog) {
         super(id, name, catalog, dbName, TableType.LAKESOUl_EXTERNAL_TABLE);
+        TableInfo tableInfo = getLakeSoulTableInfo();
+        if (tableInfo == null) {
+            throw new RuntimeException(String.format("LakeSoul table %s.%s 
does not exist", dbName, name));
+        }
+        tableId = tableInfo.getTableId();
+    }
+
+    @Override
+    public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
+        makeSureInitialized();
+        return new ExternalAnalysisTask(info);
     }
 
     private Type arrowFiledToDorisType(Field field) {
@@ -150,6 +169,7 @@ public class LakeSoulExternalTable extends ExternalTable {
         String tableSchema = tableInfo.getTableSchema();
         DBUtil.TablePartitionKeys partitionKeys = 
DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
         Schema schema;
+        LOG.info("tableSchema={}", tableSchema);
         try {
             schema = Schema.fromJSON(tableSchema);
         } catch (IOException e) {
@@ -174,6 +194,10 @@ public class LakeSoulExternalTable extends ExternalTable {
         return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, 
name);
     }
 
+    public List<PartitionInfo> listPartitionInfo() {
+        return ((LakeSoulExternalCatalog) catalog).listPartitionInfo(tableId);
+    }
+
     public String tablePath() {
         return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, 
name).getTablePath();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
new file mode 100644
index 00000000000..fba74d4f978
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DecimalLiteral;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.Subquery;
+import org.apache.doris.planner.ColumnBound;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.thrift.TExprOpcode;
+
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import io.substrait.expression.Expression;
+import io.substrait.extension.DefaultExtensionCatalog;
+import io.substrait.type.Type;
+import io.substrait.type.TypeCreator;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class LakeSoulUtils {
+
+    public static final String FILE_NAMES = "file_paths";
+    public static final String PRIMARY_KEYS = "primary_keys";
+    public static final String SCHEMA_JSON = "table_schema";
+    public static final String PARTITION_DESC = "partition_descs";
+    public static final String REQUIRED_FIELDS = "required_fields";
+    public static final String OPTIONS = "options";
+    public static final String SUBSTRAIT_PREDICATE = "substrait_predicate";
+    public static final String CDC_COLUMN = "lakesoul_cdc_change_column";
+    public static final String LIST_DELIM = ";";
+    public static final String PARTITIONS_KV_DELIM = "=";
+    public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+    public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+    public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+    public static final String FS_S3A_REGION = "fs.s3a.endpoint.region";
+    public static final String FS_S3A_PATH_STYLE_ACCESS = 
"fs.s3a.path.style.access";
+
+    private static final OffsetDateTime EPOCH;
+    private static final LocalDate EPOCH_DAY;
+
+    static {
+        EPOCH = Instant.ofEpochSecond(0L).atOffset(ZoneOffset.UTC);
+        EPOCH_DAY = EPOCH.toLocalDate();
+    }
+
+    public static List<PartitionInfo> applyPartitionFilters(
+            List<PartitionInfo> allPartitionInfo,
+            String tableName,
+            Schema partitionArrowSchema,
+            Map<String, ColumnRange> columnNameToRange
+    ) throws IOException {
+
+        Expression conjunctionFilter = null;
+        for (Field field : partitionArrowSchema.getFields()) {
+            ColumnRange columnRange = columnNameToRange.get(field.getName());
+            if (columnRange != null) {
+                Expression expr = columnRangeToSubstraitFilter(field, 
columnRange);
+                if (expr != null) {
+                    if (conjunctionFilter == null) {
+                        conjunctionFilter = expr;
+                    } else {
+                        conjunctionFilter = 
SubstraitUtil.and(conjunctionFilter, expr);
+                    }
+                }
+            }
+        }
+        return SubstraitUtil.applyPartitionFilters(
+            allPartitionInfo,
+            partitionArrowSchema,
+            SubstraitUtil.substraitExprToProto(conjunctionFilter, tableName)
+        );
+    }
+
+    public static Expression columnRangeToSubstraitFilter(
+            Field columnField,
+            ColumnRange columnRange
+    ) throws IOException {
+        Optional<RangeSet<ColumnBound>> rangeSetOpt = 
columnRange.getRangeSet();
+        if (columnRange.hasConjunctiveIsNull() || !rangeSetOpt.isPresent()) {
+            return SubstraitUtil.CONST_TRUE;
+        } else {
+            RangeSet<ColumnBound> rangeSet = rangeSetOpt.get();
+            if (rangeSet.isEmpty()) {
+                return SubstraitUtil.CONST_TRUE;
+            } else {
+                Expression conjunctionFilter = null;
+                for (Range range : rangeSet.asRanges()) {
+                    Expression expr = rangeToSubstraitFilter(columnField, 
range);
+                    if (expr != null) {
+                        if (conjunctionFilter == null) {
+                            conjunctionFilter = expr;
+                        } else {
+                            conjunctionFilter = 
SubstraitUtil.or(conjunctionFilter, expr);
+                        }
+                    }
+                }
+                return conjunctionFilter;
+            }
+        }
+    }
+
+    public static Expression rangeToSubstraitFilter(Field columnField, Range 
range) throws IOException {
+        if (!range.hasLowerBound() && !range.hasUpperBound()) {
+            // Range.all()
+            return SubstraitUtil.CONST_TRUE;
+        } else {
+            Expression upper = SubstraitUtil.CONST_TRUE;
+            if (range.hasUpperBound()) {
+                String func = range.upperBoundType() == BoundType.OPEN ? 
"lt:any_any" : "lte:any_any";
+                Expression left = 
SubstraitUtil.arrowFieldToSubstraitField(columnField);
+                Expression right = SubstraitUtil.anyToSubstraitLiteral(
+                        SubstraitUtil.arrowFieldToSubstraitType(columnField),
+                        ((ColumnBound) 
range.upperEndpoint()).getValue().getRealValue());
+                upper = SubstraitUtil.makeBinary(
+                        left,
+                        right,
+                        DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+                        func,
+                        TypeCreator.NULLABLE.BOOLEAN
+                );
+            }
+            Expression lower = SubstraitUtil.CONST_TRUE;
+            if (range.hasLowerBound()) {
+                String func = range.lowerBoundType() == BoundType.OPEN ? 
"gt:any_any" : "gte:any_any";
+                Expression left = 
SubstraitUtil.arrowFieldToSubstraitField(columnField);
+                Expression right = SubstraitUtil.anyToSubstraitLiteral(
+                        SubstraitUtil.arrowFieldToSubstraitType(columnField),
+                        ((ColumnBound) 
range.lowerEndpoint()).getValue().getRealValue());
+                lower = SubstraitUtil.makeBinary(
+                        left,
+                        right,
+                        DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+                        func,
+                        TypeCreator.NULLABLE.BOOLEAN
+                );
+            }
+            return SubstraitUtil.and(upper, lower);
+        }
+    }
+
+    public static io.substrait.proto.Plan getPushPredicate(
+            List<Expr> conjuncts,
+            String tableName,
+            Schema tableSchema,
+            Schema partitionArrowSchema,
+            Map<String, String> properties,
+            boolean incRead
+    ) throws IOException {
+
+        Set<String> partitionColumn =
+                partitionArrowSchema
+                    .getFields()
+                    .stream()
+                    .map(Field::getName)
+                    .collect(Collectors.toSet());
+        Expression conjunctionFilter = null;
+        String cdcColumn = properties.get(CDC_COLUMN);
+        if (cdcColumn != null && !incRead) {
+            conjunctionFilter = 
SubstraitUtil.cdcColumnMergeOnReadFilter(tableSchema.findField(cdcColumn));
+        }
+        for (Expr expr : conjuncts) {
+            if (!isAllPartitionPredicate(expr, partitionColumn)) {
+                Expression predicate = convertToSubstraitExpr(expr, 
tableSchema);
+                if (predicate != null) {
+                    if (conjunctionFilter == null) {
+                        conjunctionFilter = predicate;
+                    } else {
+                        conjunctionFilter = 
SubstraitUtil.and(conjunctionFilter, predicate);
+                    }
+                }
+            }
+        }
+        if (conjunctionFilter == null) {
+            return null;
+        }
+        return SubstraitUtil.substraitExprToProto(conjunctionFilter, 
tableName);
+    }
+
+    public static boolean isAllPartitionPredicate(Expr predicate, Set<String> 
partitionColumns) {
+        if (predicate == null) {
+            return false;
+        }
+        if (predicate instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+            return isAllPartitionPredicate(compoundPredicate.getChild(0), 
partitionColumns)
+                && isAllPartitionPredicate(compoundPredicate.getChild(1), 
partitionColumns);
+        }
+        // Make sure the col slot is always first
+        SlotRef slotRef = convertDorisExprToSlotRef(predicate.getChild(0));
+        LiteralExpr literalExpr = 
convertDorisExprToLiteralExpr(predicate.getChild(1));
+        if (slotRef == null || literalExpr == null) {
+            return false;
+        }
+        String colName = slotRef.getColumnName();
+        return partitionColumns.contains(colName);
+
+    }
+
+    public static SlotRef convertDorisExprToSlotRef(Expr expr) {
+        SlotRef slotRef = null;
+        if (expr instanceof SlotRef) {
+            slotRef = (SlotRef) expr;
+        } else if (expr instanceof CastExpr) {
+            if (expr.getChild(0) instanceof SlotRef) {
+                slotRef = (SlotRef) expr.getChild(0);
+            }
+        }
+        return slotRef;
+    }
+
+    public static LiteralExpr convertDorisExprToLiteralExpr(Expr expr) {
+        LiteralExpr literalExpr = null;
+        if (expr instanceof LiteralExpr) {
+            literalExpr = (LiteralExpr) expr;
+        } else if (expr instanceof CastExpr) {
+            if (expr.getChild(0) instanceof LiteralExpr) {
+                literalExpr = (LiteralExpr) expr.getChild(0);
+            }
+        }
+        return literalExpr;
+    }
+
+    public static Expression convertToSubstraitExpr(Expr predicate, Schema 
tableSchema) throws IOException {
+        if (predicate == null) {
+            return null;
+        }
+        if (predicate instanceof BoolLiteral) {
+            BoolLiteral boolLiteral = (BoolLiteral) predicate;
+            boolean value = boolLiteral.getValue();
+            if (value) {
+                return SubstraitUtil.CONST_TRUE;
+            } else {
+                return SubstraitUtil.CONST_FALSE;
+            }
+        }
+        if (predicate instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+            switch (compoundPredicate.getOp()) {
+                case AND: {
+                    Expression left = 
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+                    Expression right = 
convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema);
+                    if (left != null && right != null) {
+                        return SubstraitUtil.and(left, right);
+                    } else if (left != null) {
+                        return left;
+                    } else {
+                        return right;
+                    }
+                }
+                case OR: {
+                    Expression left = 
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+                    Expression right = 
convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema);
+                    if (left != null && right != null) {
+                        return SubstraitUtil.or(left, right);
+                    }
+                    return null;
+                }
+                case NOT: {
+                    Expression child = 
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+                    if (child != null) {
+                        return SubstraitUtil.not(child);
+                    }
+                    return null;
+                }
+                default:
+                    return null;
+            }
+        } else if (predicate instanceof InPredicate) {
+            InPredicate inExpr = (InPredicate) predicate;
+            if (inExpr.contains(Subquery.class)) {
+                return null;
+            }
+            SlotRef slotRef = convertDorisExprToSlotRef(inExpr.getChild(0));
+            if (slotRef == null) {
+                return null;
+            }
+            String colName = slotRef.getColumnName();
+            Field field = tableSchema.findField(colName);
+            Expression fieldRef = 
SubstraitUtil.arrowFieldToSubstraitField(field);
+
+            colName = field.getName();
+            Type type = field.getType().accept(
+                new 
SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable())
+            );
+            List<Expression.Literal> valueList = new ArrayList<>();
+            for (int i = 1; i < inExpr.getChildren().size(); ++i) {
+                if (!(inExpr.getChild(i) instanceof LiteralExpr)) {
+                    return null;
+                }
+                LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i);
+                Object value = extractDorisLiteral(type, literalExpr);
+                if (value == null) {
+                    return null;
+                }
+                valueList.add(SubstraitUtil.anyToSubstraitLiteral(type, 
value));
+            }
+            if (inExpr.isNotIn()) {
+                // not in
+                return SubstraitUtil.notIn(fieldRef, valueList);
+            } else {
+                // in
+                return SubstraitUtil.in(fieldRef, valueList);
+            }
+        }
+        return convertBinaryExpr(predicate, tableSchema);
+    }
+
+    private static Expression convertBinaryExpr(Expr dorisExpr, Schema 
tableSchema) throws IOException {
+        TExprOpcode opcode = dorisExpr.getOpcode();
+        // Make sure the col slot is always first
+        SlotRef slotRef = convertDorisExprToSlotRef(dorisExpr.getChild(0));
+        LiteralExpr literalExpr = 
convertDorisExprToLiteralExpr(dorisExpr.getChild(1));
+        if (slotRef == null || literalExpr == null) {
+            return null;
+        }
+        String colName = slotRef.getColumnName();
+        Field field = tableSchema.findField(colName);
+        Expression fieldRef = SubstraitUtil.arrowFieldToSubstraitField(field);
+
+        Type type = field.getType().accept(
+            new 
SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable())
+        );
+        Object value = extractDorisLiteral(type, literalExpr);
+        if (value == null) {
+            if (opcode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof 
NullLiteral) {
+                return SubstraitUtil.makeUnary(
+                        fieldRef,
+                        DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+                        "is_null:any",
+                        TypeCreator.NULLABLE.BOOLEAN);
+            } else {
+                return null;
+            }
+        }
+        Expression literal = SubstraitUtil.anyToSubstraitLiteral(
+                type,
+                value
+        );
+
+        String namespace;
+        String func;
+        switch (opcode) {
+            case EQ:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "equal:any_any";
+                break;
+            case EQ_FOR_NULL:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "is_null:any";
+                break;
+            case NE:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "not_equal:any_any";
+                break;
+            case GE:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "gte:any_any";
+                break;
+            case GT:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "gt:any_any";
+                break;
+            case LE:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "lte:any_any";
+                break;
+            case LT:
+                namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                func = "lt:any_any";
+                break;
+            case INVALID_OPCODE:
+                if (dorisExpr instanceof IsNullPredicate) {
+                    if (((IsNullPredicate) dorisExpr).isNotNull()) {
+                        namespace = 
DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                        func = "is_not_null:any";
+                    } else {
+                        namespace = 
DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+                        func = "is_null:any";
+                    }
+                    break;
+                }
+                return null;
+            default:
+                return null;
+        }
+        return SubstraitUtil.makeBinary(fieldRef, literal, namespace, func, 
TypeCreator.NULLABLE.BOOLEAN);
+    }
+
+    public static Object extractDorisLiteral(Type type, LiteralExpr expr) {
+
+        if (expr instanceof BoolLiteral) {
+            if (type instanceof Type.Bool) {
+                return ((BoolLiteral) expr).getValue();
+            }
+            if (type instanceof Type.Str) {
+                return expr.getStringValue();
+            }
+        } else if (expr instanceof DateLiteral) {
+            DateLiteral dateLiteral = (DateLiteral) expr;
+            if (type instanceof Type.Date) {
+                if (dateLiteral.getType().isDatetimeV2() || 
dateLiteral.getType().isDatetime()) {
+                    return null;
+                }
+                return (int) LocalDate.of((int) dateLiteral.getYear(),
+                        (int) dateLiteral.getMonth(),
+                        (int) dateLiteral.getDay()).toEpochDay();
+            }
+            if (type instanceof Type.TimestampTZ || type instanceof 
Type.Timestamp) {
+                return dateLiteral.getLongValue();
+            }
+            if (type instanceof Type.Str) {
+                return expr.getStringValue();
+            }
+        } else if (expr instanceof DecimalLiteral) {
+            DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
+            if (type instanceof Type.Decimal) {
+                return decimalLiteral.getValue();
+            } else if (type instanceof Type.FP64) {
+                return decimalLiteral.getDoubleValue();
+            }
+            if (type instanceof Type.Str) {
+                return expr.getStringValue();
+            }
+        } else if (expr instanceof FloatLiteral) {
+            FloatLiteral floatLiteral = (FloatLiteral) expr;
+
+            if (floatLiteral.getType() == org.apache.doris.catalog.Type.FLOAT) 
{
+                return type instanceof Type.FP32
+                    || type instanceof Type.FP64
+                    || type instanceof Type.Decimal ? ((FloatLiteral) 
expr).getValue() : null;
+            } else {
+                return type instanceof Type.FP64
+                    || type instanceof Type.Decimal ? ((FloatLiteral) 
expr).getValue() : null;
+            }
+        } else if (expr instanceof IntLiteral) {
+            if (type instanceof Type.I8
+                    || type instanceof Type.I16
+                    || type instanceof Type.I32
+                    || type instanceof Type.I64
+                    || type instanceof Type.FP32
+                    || type instanceof Type.FP64
+                    || type instanceof Type.Decimal
+                    || type instanceof Type.Date
+            ) {
+                return expr.getRealValue();
+            }
+            if (!expr.getType().isInteger32Type()) {
+                if (type instanceof Type.Time || type instanceof 
Type.Timestamp || type instanceof Type.TimestampTZ) {
+                    return expr.getLongValue();
+                }
+            }
+
+        } else if (expr instanceof StringLiteral) {
+            String value = expr.getStringValue();
+            if (type instanceof Type.Str) {
+                return value;
+            }
+            if (type instanceof Type.Date) {
+                try {
+                    return (int) ChronoUnit.DAYS.between(
+                        EPOCH_DAY,
+                        LocalDate.parse(value, 
DateTimeFormatter.ISO_LOCAL_DATE));
+                } catch (DateTimeParseException e) {
+                    return null;
+                }
+            }
+            if (type instanceof Type.Timestamp || type instanceof 
Type.TimestampTZ) {
+                try {
+                    return ChronoUnit.MICROS.between(
+                        EPOCH,
+                        OffsetDateTime.parse(value, 
DateTimeFormatter.ISO_DATE_TIME));
+                } catch (DateTimeParseException e) {
+                    return null;
+                }
+            }
+        }
+        return null;
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
index 1779aeaca10..dd15ec310de 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
@@ -22,9 +22,13 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.TableFormatType;
 import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
+import org.apache.doris.datasource.lakesoul.LakeSoulUtils;
+import org.apache.doris.datasource.property.constants.MinioProperties;
+import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -34,15 +38,26 @@ import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TLakeSoulFileDesc;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
 import com.dmetasoul.lakesoul.meta.DBUtil;
 import com.dmetasoul.lakesoul.meta.DataFileInfo;
 import com.dmetasoul.lakesoul.meta.DataOperation;
 import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
 import com.dmetasoul.lakesoul.meta.entity.TableInfo;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
-import com.lakesoul.shaded.com.alibaba.fastjson.JSON;
-import com.lakesoul.shaded.com.alibaba.fastjson.JSONObject;
+import io.substrait.proto.Plan;
+import lombok.SneakyThrows;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -52,19 +67,55 @@ import java.util.stream.Collectors;
 
 public class LakeSoulScanNode extends FileQueryScanNode {
 
-    protected final LakeSoulExternalTable lakeSoulExternalTable;
+    private static final Logger LOG = 
LogManager.getLogger(LakeSoulScanNode.class);
 
-    protected final TableInfo table;
+    protected LakeSoulExternalTable lakeSoulExternalTable;
+
+    String tableName;
+
+    String location;
+
+    String partitions;
+
+    Schema tableArrowSchema;
+
+    Schema partitionArrowSchema;
+    private Map<String, String> tableProperties;
+
+    String readType;
 
     public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
         super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, 
needCheckColumnPriv);
+    }
+
+    @Override
+    protected void doInitialize() throws UserException {
+        super.doInitialize();
         lakeSoulExternalTable = (LakeSoulExternalTable) desc.getTable();
-        table = lakeSoulExternalTable.getLakeSoulTableInfo();
+        TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo();
+        location = tableInfo.getTablePath();
+        tableName = tableInfo.getTableName();
+        partitions = tableInfo.getPartitions();
+        readType = LakeSoulOptions.ReadType$.MODULE$.FULL_READ();
+        try {
+            tableProperties = new ObjectMapper().readValue(
+                tableInfo.getProperties(),
+                new TypeReference<Map<String, String>>() {}
+            );
+            tableArrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
+            List<Field> partitionFields =
+                    DBUtil.parseTableInfoPartitions(partitions)
+                        .rangeKeys
+                        .stream()
+                        
.map(tableArrowSchema::findField).collect(Collectors.toList());
+            partitionArrowSchema = new Schema(partitionFields);
+        } catch (IOException e) {
+            throw new UserException(e);
+        }
     }
 
     @Override
     protected TFileType getLocationType() throws UserException {
-        String location = table.getTablePath();
         return getLocationType(location);
     }
 
@@ -81,12 +132,12 @@ public class LakeSoulScanNode extends FileQueryScanNode {
 
     @Override
     protected List<String> getPathPartitionKeys() throws UserException {
-        return new 
ArrayList<>(DBUtil.parseTableInfoPartitions(table.getPartitions()).rangeKeys);
+        return new 
ArrayList<>(DBUtil.parseTableInfoPartitions(partitions).rangeKeys);
     }
 
     @Override
     protected TableIf getTargetTable() throws UserException {
-        return lakeSoulExternalTable;
+        return desc.getTable();
     }
 
     @Override
@@ -94,13 +145,21 @@ public class LakeSoulScanNode extends FileQueryScanNode {
         return lakeSoulExternalTable.getHadoopProperties();
     }
 
+    @SneakyThrows
     @Override
     protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}", rangeDesc);
+        }
         if (split instanceof LakeSoulSplit) {
             setLakeSoulParams(rangeDesc, (LakeSoulSplit) split);
         }
     }
 
+    public ExternalCatalog getCatalog() {
+        return lakeSoulExternalTable.getCatalog();
+    }
+
     public static boolean isExistHashPartition(TableInfo tif) {
         JSONObject tableProperties = JSON.parseObject(tif.getProperties());
         if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM())
@@ -111,13 +170,53 @@ public class LakeSoulScanNode extends FileQueryScanNode {
         }
     }
 
-    public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit 
lakeSoulSplit) {
+    public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit 
lakeSoulSplit) throws IOException {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(lakeSoulSplit.getTableFormatType().value());
         TLakeSoulFileDesc fileDesc = new TLakeSoulFileDesc();
         fileDesc.setFilePaths(lakeSoulSplit.getPaths());
         fileDesc.setPrimaryKeys(lakeSoulSplit.getPrimaryKeys());
         fileDesc.setTableSchema(lakeSoulSplit.getTableSchema());
+
+
+        JSONObject options = new JSONObject();
+        Plan predicate = LakeSoulUtils.getPushPredicate(
+                conjuncts,
+                tableName,
+                tableArrowSchema,
+                partitionArrowSchema,
+                tableProperties,
+                
readType.equals(LakeSoulOptions.ReadType$.MODULE$.INCREMENTAL_READ()));
+        if (predicate != null) {
+            options.put(LakeSoulUtils.SUBSTRAIT_PREDICATE, 
SubstraitUtil.encodeBase64String(predicate));
+        }
+        Map<String, String> catalogProps = getCatalog().getProperties();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}", catalogProps);
+        }
+
+        if (catalogProps.get(S3Properties.Env.ENDPOINT) != null) {
+            options.put(LakeSoulUtils.FS_S3A_ENDPOINT, 
catalogProps.get(S3Properties.Env.ENDPOINT));
+            if (options.containsKey(MinioProperties.ENDPOINT)) {
+                // Use path style access for minio
+                options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "true");
+            } else {
+                // use virtual hosted style access for all other s3 compatible 
storage services
+                options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "false");
+            }
+            if (catalogProps.get(S3Properties.Env.ACCESS_KEY) != null) {
+                options.put(LakeSoulUtils.FS_S3A_ACCESS_KEY, 
catalogProps.get(S3Properties.Env.ACCESS_KEY));
+            }
+            if (catalogProps.get(S3Properties.Env.SECRET_KEY) != null) {
+                options.put(LakeSoulUtils.FS_S3A_SECRET_KEY, 
catalogProps.get(S3Properties.Env.SECRET_KEY));
+            }
+            if (catalogProps.get(S3Properties.Env.REGION) != null) {
+                options.put(LakeSoulUtils.FS_S3A_REGION, 
catalogProps.get(S3Properties.Env.REGION));
+            }
+        }
+
+        fileDesc.setOptions(JSON.toJSONString(options));
+
         fileDesc.setPartitionDescs(lakeSoulSplit.getPartitionDesc()
                 .entrySet().stream().map(entry ->
                         String.format("%s=%s", entry.getKey(), 
entry.getValue())).collect(Collectors.toList()));
@@ -126,24 +225,51 @@ public class LakeSoulScanNode extends FileQueryScanNode {
     }
 
     public List<Split> getSplits() throws UserException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getSplits with columnFilters={}", columnFilters);
+            LOG.debug("getSplits with columnNameToRange={}", 
columnNameToRange);
+            LOG.debug("getSplits with conjuncts={}", conjuncts);
+        }
+
+        List<PartitionInfo> allPartitionInfo = 
lakeSoulExternalTable.listPartitionInfo();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("allPartitionInfo={}", allPartitionInfo);
+        }
+        List<PartitionInfo> filteredPartitionInfo = allPartitionInfo;
+        try {
+            filteredPartitionInfo =
+                    LakeSoulUtils.applyPartitionFilters(
+                        allPartitionInfo,
+                        tableName,
+                        partitionArrowSchema,
+                        columnNameToRange
+                    );
+        } catch (IOException e) {
+            throw new UserException(e);
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("filteredPartitionInfo={}", filteredPartitionInfo);
+        }
+        DataFileInfo[] dataFileInfos = 
DataOperation.getTableDataInfo(filteredPartitionInfo);
+
         List<Split> splits = new ArrayList<>();
         Map<String, Map<Integer, List<String>>> splitByRangeAndHashPartition = 
new LinkedHashMap<>();
-        TableInfo tif = table;
-        DataFileInfo[] dfinfos = 
DataOperation.getTableDataInfo(table.getTableId());
-        for (DataFileInfo pif : dfinfos) {
-            if (isExistHashPartition(tif) && pif.file_bucket_id() != -1) {
-                
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new 
LinkedHashMap<>())
-                        .computeIfAbsent(pif.file_bucket_id(), v -> new 
ArrayList<>())
-                        .add(pif.path());
+        TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo();
+
+        for (DataFileInfo fileInfo : dataFileInfos) {
+            if (isExistHashPartition(tableInfo) && fileInfo.file_bucket_id() 
!= -1) {
+                
splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k -> 
new LinkedHashMap<>())
+                        .computeIfAbsent(fileInfo.file_bucket_id(), v -> new 
ArrayList<>())
+                        .add(fileInfo.path());
             } else {
-                
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new 
LinkedHashMap<>())
+                
splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k -> 
new LinkedHashMap<>())
                         .computeIfAbsent(-1, v -> new ArrayList<>())
-                        .add(pif.path());
+                        .add(fileInfo.path());
             }
         }
         List<String> pkKeys = null;
-        if (!table.getPartitions().equals(";")) {
-            pkKeys = 
Lists.newArrayList(table.getPartitions().split(";")[1].split(","));
+        if (!tableInfo.getPartitions().equals(";")) {
+            pkKeys = 
Lists.newArrayList(tableInfo.getPartitions().split(";")[1].split(","));
         }
 
         for (Map.Entry<String, Map<Integer, List<String>>> entry : 
splitByRangeAndHashPartition.entrySet()) {
@@ -161,7 +287,7 @@ public class LakeSoulScanNode extends FileQueryScanNode {
                         split.getValue(),
                         pkKeys,
                         rangeDesc,
-                        table.getTableSchema(),
+                        tableInfo.getTableSchema(),
                         0, 0, 0,
                         new String[0], null);
                 lakeSoulSplit.setTableFormatType(TableFormatType.LAKESOUL);
@@ -169,8 +295,6 @@ public class LakeSoulScanNode extends FileQueryScanNode {
             }
         }
         return splits;
-
     }
-
 }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
new file mode 100644
index 00000000000..016819a382b
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.CompoundPredicate.Operator;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DecimalLiteral;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import io.substrait.expression.Expression;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LakeSoulPredicateTest {
+
+    public static Schema schema;
+
+    @BeforeClass
+    public static void before() throws AnalysisException, IOException {
+        schema = new Schema(
+                Arrays.asList(
+                    new Field("c_int", FieldType.nullable(new 
ArrowType.Int(32, true)), null),
+                    new Field("c_long", FieldType.nullable(new 
ArrowType.Int(64, true)), null),
+                    new Field("c_bool", FieldType.nullable(new 
ArrowType.Bool()), null),
+                    new Field("c_float", FieldType.nullable(new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null),
+                    new Field("c_double", FieldType.nullable(new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null),
+                    new Field("c_dec", FieldType.nullable(new 
ArrowType.Decimal(20, 10)), null),
+                    new Field("c_date", FieldType.nullable(new 
ArrowType.Date(DateUnit.DAY)), null),
+                    new Field("c_ts", FieldType.nullable(new 
ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), null),
+                    new Field("c_str", FieldType.nullable(new 
ArrowType.Utf8()), null)
+                ));
+    }
+
+    @Test
+    public void testBinaryPredicate() throws AnalysisException, IOException {
+        List<LiteralExpr> literalList = new ArrayList<LiteralExpr>() {{
+                add(new BoolLiteral(true));
+                add(new DateLiteral("2023-01-02", Type.DATEV2));
+                add(new DateLiteral("2024-01-02 12:34:56.123456", 
Type.DATETIMEV2));
+                add(new DecimalLiteral(new BigDecimal("1.23")));
+                add(new FloatLiteral(1.23, Type.FLOAT));
+                add(new FloatLiteral(3.456, Type.DOUBLE));
+                add(new IntLiteral(1, Type.TINYINT));
+                add(new IntLiteral(1, Type.SMALLINT));
+                add(new IntLiteral(1, Type.INT));
+                add(new IntLiteral(1, Type.BIGINT));
+                add(new StringLiteral("abc"));
+                add(new StringLiteral("2023-01-02"));
+                add(new StringLiteral("2023-01-02 01:02:03.456789"));
+            }};
+
+        List<SlotRef> slotRefs = new ArrayList<SlotRef>() {{
+                add(new SlotRef(new TableName(), "c_int"));
+                add(new SlotRef(new TableName(), "c_long"));
+                add(new SlotRef(new TableName(), "c_bool"));
+                add(new SlotRef(new TableName(), "c_float"));
+                add(new SlotRef(new TableName(), "c_double"));
+                add(new SlotRef(new TableName(), "c_dec"));
+                add(new SlotRef(new TableName(), "c_date"));
+                add(new SlotRef(new TableName(), "c_ts"));
+                add(new SlotRef(new TableName(), "c_str"));
+            }};
+
+        // true indicates support for pushdown
+        Boolean[][] expects = new Boolean[][] {
+                { // int
+                        false, false, false, false, false, false, true, true, 
true, true, false, false, false
+                },
+                { // long
+                        false, false, false, false, false, false, true, true, 
true, true, false, false, false
+                },
+                { // boolean
+                        true, false, false, false, false, false, false, false, 
false, false, false, false, false
+                },
+                { // float
+                        false, false, false, false, true, false, true, true, 
true, true, false, false, false
+                },
+                { // double
+                        false, false, false, true, true, true, true, true, 
true, true, false, false, false
+                },
+                { // decimal
+                        false, false, false, true, true, true, true, true, 
true, true, false, false, false
+                },
+                { // date
+                        false, true, false, false, false, false, true, true, 
true, true, false, true, false
+                },
+                { // timestamp
+                        false, true, true, false, false, false, false, false, 
false, true, false, false, false
+                },
+                { // string
+                        true, true, true, true, false, false, false, false, 
false, false, true, true, true
+                }
+        };
+
+        ArrayListMultimap<Boolean, Expr> validPredicateMap = 
ArrayListMultimap.create();
+
+        // binary predicate
+        for (int i = 0; i < slotRefs.size(); i++) {
+            final int loc = i;
+            List<Boolean> ret = literalList.stream().map(literal -> {
+                BinaryPredicate expr = new 
BinaryPredicate(BinaryPredicate.Operator.EQ, slotRefs.get(loc), literal);
+                Expression expression = null;
+                try {
+                    expression = LakeSoulUtils.convertToSubstraitExpr(expr, 
schema);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                validPredicateMap.put(expression != null, expr);
+                return expression != null;
+            }).collect(Collectors.toList());
+            Assert.assertArrayEquals(expects[i], ret.toArray());
+        }
+
+        // in predicate
+        for (int i = 0; i < slotRefs.size(); i++) {
+            final int loc = i;
+            List<Boolean> ret = literalList.stream().map(literal -> {
+                InPredicate expr = new InPredicate(slotRefs.get(loc), 
Lists.newArrayList(literal), false);
+                Expression expression = null;
+                try {
+                    expression = LakeSoulUtils.convertToSubstraitExpr(expr, 
schema);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                validPredicateMap.put(expression != null, expr);
+                return expression != null;
+            }).collect(Collectors.toList());
+            Assert.assertArrayEquals(expects[i], ret.toArray());
+        }
+
+        // not in predicate
+        for (int i = 0; i < slotRefs.size(); i++) {
+            final int loc = i;
+            List<Boolean> ret = literalList.stream().map(literal -> {
+                InPredicate expr = new InPredicate(slotRefs.get(loc), 
Lists.newArrayList(literal), true);
+                Expression expression = null;
+                try {
+                    expression = LakeSoulUtils.convertToSubstraitExpr(expr, 
schema);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                validPredicateMap.put(expression != null, expr);
+                return expression != null;
+            }).collect(Collectors.toList());
+            Assert.assertArrayEquals(expects[i], ret.toArray());
+        }
+
+        // bool literal
+
+        Expression trueExpr = LakeSoulUtils.convertToSubstraitExpr(new 
BoolLiteral(true), schema);
+        Expression falseExpr = LakeSoulUtils.convertToSubstraitExpr(new 
BoolLiteral(false), schema);
+        Assert.assertEquals(SubstraitUtil.CONST_TRUE, trueExpr);
+        Assert.assertEquals(SubstraitUtil.CONST_FALSE, falseExpr);
+        validPredicateMap.put(true, new BoolLiteral(true));
+        validPredicateMap.put(true, new BoolLiteral(false));
+
+        List<Expr> validExprs = validPredicateMap.get(true);
+        List<Expr> invalidExprs = validPredicateMap.get(false);
+        // OR predicate
+        // both valid
+        for (int i = 0; i < validExprs.size(); i++) {
+            for (int j = 0; j < validExprs.size(); j++) {
+                CompoundPredicate orPredicate = new 
CompoundPredicate(Operator.OR,
+                        validExprs.get(i), validExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+                Assert.assertNotNull("pred: " + orPredicate.toSql(), 
expression);
+            }
+        }
+        // both invalid
+        for (int i = 0; i < invalidExprs.size(); i++) {
+            for (int j = 0; j < invalidExprs.size(); j++) {
+                CompoundPredicate orPredicate = new 
CompoundPredicate(Operator.OR,
+                        invalidExprs.get(i), invalidExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+                Assert.assertNull("pred: " + orPredicate.toSql(), expression);
+            }
+        }
+        // valid or invalid
+        for (int i = 0; i < validExprs.size(); i++) {
+            for (int j = 0; j < invalidExprs.size(); j++) {
+                CompoundPredicate orPredicate = new 
CompoundPredicate(Operator.OR,
+                        validExprs.get(i), invalidExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+                Assert.assertNull("pred: " + orPredicate.toSql(), expression);
+            }
+        }
+
+        // AND predicate
+        // both valid
+        for (int i = 0; i < validExprs.size(); i++) {
+            for (int j = 0; j < validExprs.size(); j++) {
+                CompoundPredicate andPredicate = new 
CompoundPredicate(Operator.AND,
+                        validExprs.get(i), validExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+                Assert.assertNotNull("pred: " + andPredicate.toSql(), 
expression);
+            }
+        }
+        // both invalid
+        for (int i = 0; i < invalidExprs.size(); i++) {
+            for (int j = 0; j < invalidExprs.size(); j++) {
+                CompoundPredicate andPredicate = new 
CompoundPredicate(Operator.AND,
+                        invalidExprs.get(i), invalidExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+                Assert.assertNull("pred: " + andPredicate.toSql(), expression);
+            }
+        }
+        // valid and invalid
+        for (int i = 0; i < validExprs.size(); i++) {
+            for (int j = 0; j < invalidExprs.size(); j++) {
+                CompoundPredicate andPredicate = new 
CompoundPredicate(Operator.AND,
+                        validExprs.get(i), invalidExprs.get(j));
+                Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+                Assert.assertNotNull("pred: " + andPredicate.toSql(), 
expression);
+                
Assert.assertEquals(SubstraitUtil.substraitExprToProto(LakeSoulUtils.convertToSubstraitExpr(validExprs.get(i),
 schema), "table"),
+                        SubstraitUtil.substraitExprToProto(expression, 
"table"));
+            }
+        }
+
+        // NOT predicate
+        // valid
+        for (int i = 0; i < validExprs.size(); i++) {
+            CompoundPredicate notPredicate = new 
CompoundPredicate(Operator.NOT,
+                    validExprs.get(i), null);
+            Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema);
+            Assert.assertNotNull("pred: " + notPredicate.toSql(), expression);
+        }
+        // invalid
+        for (int i = 0; i < invalidExprs.size(); i++) {
+            CompoundPredicate notPredicate = new 
CompoundPredicate(Operator.NOT,
+                    invalidExprs.get(i), null);
+            Expression expression = 
LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema);
+            Assert.assertNull("pred: " + notPredicate.toSql(), expression);
+        }
+    }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index 42ab14bd3dc..b679f466f07 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -317,6 +317,8 @@ under the License.
         <hudi.version>0.14.1</hudi.version>
         <presto.hadoop.version>2.7.4-11</presto.hadoop.version>
         <presto.hive.version>3.0.0-8</presto.hive.version>
+        <!-- lakesoul -->
+        <lakesoul.version>2.6.2</lakesoul.version>
 
         <parquet.version>1.13.1</parquet.version>
         <commons-collections.version>3.2.2</commons-collections.version>
@@ -1824,4 +1826,4 @@ under the License.
             </snapshots>
         </repository>
     </repositories>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy 
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
new file mode 100644
index 00000000000..01c5d981c49
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_lakesoul_filter", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableLakesoulTest")
+    // open it when docker image is ready to run in regression test
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String catalog_name = "lakesoul"
+        String db_name = "default"
+        String pg_user = context.config.otherConfigs.get("lakesoulPGUser")
+        String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd")
+        String pg_url = context.config.otherConfigs.get("lakesoulPGUrl")
+        String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK")
+        String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK")
+        String minio_endpoint = 
context.config.otherConfigs.get("lakesoulMinioEndpoint")
+
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """create catalog lakesoul  properties (
+            'type'='lakesoul',
+            'lakesoul.pg.username'='${pg_user}',
+            'lakesoul.pg.password'='${pg_pwd}',
+            'lakesoul.pg.url'='${pg_url}',
+            'minio.endpoint'='${minio_endpoint}',
+            'minio.access_key'='${minio_ak}',
+            'minio.secret_key'='${minio_sk}'
+            );"""
+
+        // analyze
+        sql """use `${catalog_name}`.`${db_name}`"""
+
+        sql """show tables;"""
+        // select
+        sql """select * from region;"""
+
+        sql """select * from nation;"""
+
+        sql """select * from nation where n_regionkey = 0 or n_nationkey > 
14;"""
+
+        sql """select * from nation where n_regionkey = 0 and n_nationkey > 
0;"""
+
+        sql """select * from nation where n_regionkey = 0;"""
+
+        // non-selecting query
+        sql """select count(*) from customer;"""
+
+        // filter by non-partition column
+        sql """select count(*) from customer where c_mktsegment='BUILDING';"""
+
+        // filter by partition column
+        sql """select count(*) from customer where c_nationkey=19;"""
+
+        // filter by both partition and non-partition column
+        sql """select count(*) from customer where c_mktsegment='BUILDING' and 
c_nationkey=19;"""
+
+        sql """select * from lineitem where l_shipdate <= DATE '1992-12-01' 
limit 10;"""
+
+        sql """select count(*) from part where p_type like 'MEDIUM 
POLISHED%';"""
+    }
+}
+
diff --git 
a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
 
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
index 9369a28e8fe..6a1f83a5e24 100644
--- 
a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
+++ 
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
@@ -43,18 +43,18 @@ suite("test_external_table_lakesoul", 
"p2,external,lakesoul,external_remote,exte
                 """
 
             // analyze
-            sql """use `${catalog_name}`.`${db_name}`""" 
-        
-          sql q1
-           sql q2
-           sql q3
-           sql q4
-           sql q5
-           sql q6
-           sql q7
-           sql q8
-           sql q9
-           sql q11
+            sql """use `${catalog_name}`.`${db_name}`"""
+
+        sql q1
+        sql q2
+        sql q3
+        sql q4
+        sql q5
+        sql q6
+        sql q7
+        sql q8
+        sql q9
+        sql q11
 
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to