This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8c9fcf0a630 [fix](jni) remove 'push_down_predicates' and fix BE crash 
with decimal predicate (#32253) (#32599)
8c9fcf0a630 is described below

commit 8c9fcf0a6309fa993e9e235dd6709c83872b895e
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Thu Mar 21 13:48:51 2024 +0800

    [fix](jni) remove 'push_down_predicates' and fix BE crash with decimal 
predicate (#32253) (#32599)
---
 be/src/vec/exec/jni_connector.cpp                       | 17 ++++++++++-------
 be/src/vec/exec/jni_connector.h                         |  3 +++
 .../main/java/org/apache/doris/avro/AvroJNIScanner.java |  3 +--
 .../main/java/org/apache/doris/hudi/HudiJniScanner.java | 16 +---------------
 .../java/org/apache/doris/common/jni/JniScanner.java    | 12 +++++++++---
 .../org/apache/doris/common/jni/MockJniScanner.java     | 11 +----------
 .../apache/doris/maxcompute/MaxComputeJniScanner.java   | 16 +++-------------
 .../java/org/apache/doris/paimon/PaimonJniScanner.java  |  3 +--
 8 files changed, 29 insertions(+), 52 deletions(-)

diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index 00c0dcd8c7b..b802ff2431c 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -102,13 +102,16 @@ Status JniConnector::open(RuntimeState* state, 
RuntimeProfile* profile) {
 
 Status JniConnector::init(
         std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
-    _generate_predicates(colname_to_value_range);
-    if (_predicates_length != 0 && _predicates != nullptr) {
-        int64_t predicates_address = (int64_t)_predicates.get();
-        // We can call 
org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the
-        // serialized predicates in java side.
-        _scanner_params.emplace("push_down_predicates", 
std::to_string(predicates_address));
-    }
+    // TODO: This logic need to be changed.
+    // See the comment of "predicates" field in JniScanner.java
+
+    // _generate_predicates(colname_to_value_range);
+    // if (_predicates_length != 0 && _predicates != nullptr) {
+    //     int64_t predicates_address = (int64_t)_predicates.get();
+    //     // We can call 
org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the
+    //     // serialized predicates in java side.
+    //     _scanner_params.emplace("push_down_predicates", 
std::to_string(predicates_address));
+    // }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index ae3711f663a..22e33f01053 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -164,6 +164,9 @@ public:
                     char_ptr += s->size;
                 }
             } else {
+                // FIXME: it can not handle decimal type correctly.
+                // but this logic is deprecated and not used.
+                // so may be deleted or fixed later.
                 for (const CppType* v : values) {
                     int type_len = sizeof(CppType);
                     *reinterpret_cast<int*>(char_ptr) = type_len;
diff --git 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
index 17a185d03ae..dc845f43cb8 100644
--- 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
+++ 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
@@ -19,7 +19,6 @@ package org.apache.doris.avro;
 
 import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ScanPredicate;
 import org.apache.doris.common.jni.vec.TableSchema;
 import org.apache.doris.thrift.TFileType;
 
@@ -173,7 +172,7 @@ public class AvroJNIScanner extends JniScanner {
         try {
             initAvroFileContext();
             initFieldInspector();
-            initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], 
fetchSize);
+            initTableInfo(requiredTypes, requiredFields, fetchSize);
         } catch (Exception e) {
             LOG.warn("Failed to init avro scanner. ", e);
             throw new RuntimeException(e);
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index 932b53b9a7c..0a1b69fcfb0 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -20,7 +20,6 @@ package org.apache.doris.hudi;
 
 import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ScanPredicate;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.security.authentication.HadoopUGI;
 
@@ -59,7 +58,6 @@ public class HudiJniScanner extends JniScanner {
     private final int fetchSize;
     private final String debugString;
     private final HoodieSplit split;
-    private final ScanPredicate[] predicates;
     private final ClassLoader classLoader;
 
     private long getRecordReaderTimeNs = 0;
@@ -123,20 +121,8 @@ public class HudiJniScanner extends JniScanner {
                 .collect(Collectors.joining("\n"));
         try {
             this.classLoader = this.getClass().getClassLoader();
-            String predicatesAddressString = 
params.remove("push_down_predicates");
             this.fetchSize = fetchSize;
             this.split = new HoodieSplit(params);
-            if (predicatesAddressString == null) {
-                predicates = new ScanPredicate[0];
-            } else {
-                long predicatesAddress = 
Long.parseLong(predicatesAddressString);
-                if (predicatesAddress != 0) {
-                    predicates = 
ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes());
-                    LOG.info("HudiJniScanner gets pushed-down predicates:  " + 
ScanPredicate.dump(predicates));
-                } else {
-                    predicates = new ScanPredicate[0];
-                }
-            }
         } catch (Exception e) {
             LOG.error("Failed to initialize hudi scanner, split params:\n" + 
debugString, e);
             throw e;
@@ -147,7 +133,7 @@ public class HudiJniScanner extends JniScanner {
     public void open() throws IOException {
         Future<?> avroFuture = avroReadPool.submit(() -> {
             Thread.currentThread().setContextClassLoader(classLoader);
-            initTableInfo(split.requiredTypes(), split.requiredFields(), 
predicates, fetchSize);
+            initTableInfo(split.requiredTypes(), split.requiredFields(), 
fetchSize);
             long startTime = System.nanoTime();
             // RecordReader will use ProcessBuilder to start a hotspot 
process, which may be stuck,
             // so use another process to kill this stuck process.
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
index 51ed837813c..cb191f4b038 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
@@ -33,6 +33,14 @@ public abstract class JniScanner {
     protected VectorTable vectorTable;
     protected String[] fields;
     protected ColumnType[] types;
+    @Deprecated
+    // This predicate is from BE, but no used.
+    // TODO: actually, we can generate the predicate for JNI scanner in FE's 
planner,
+    // then serialize it to BE, and BE pass it to JNI scanner directly.
+    // NO need to use this intermediate expression, because each JNI scanner 
has its
+    // own predicate expression format.
+    // For example, Paimon use 
"PaimonScannerUtils.decodeStringToObject(paimonPredicate)"
+    // to deserialize the predicate string to PaimonPredicate object.
     protected ScanPredicate[] predicates;
     protected int batchSize;
 
@@ -50,11 +58,9 @@ public abstract class JniScanner {
         throw new UnsupportedOperationException();
     }
 
-    protected void initTableInfo(ColumnType[] requiredTypes, String[] 
requiredFields, ScanPredicate[] predicates,
-            int batchSize) {
+    protected void initTableInfo(ColumnType[] requiredTypes, String[] 
requiredFields, int batchSize) {
         this.types = requiredTypes;
         this.fields = requiredFields;
-        this.predicates = predicates;
         this.batchSize = batchSize;
     }
 
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
index bc7561e2a23..000f536e915 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
@@ -20,7 +20,6 @@ package org.apache.doris.common.jni;
 
 import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ColumnValue;
-import org.apache.doris.common.jni.vec.ScanPredicate;
 
 import org.apache.log4j.Logger;
 
@@ -187,15 +186,7 @@ public class MockJniScanner extends JniScanner {
         for (int i = 0; i < types.length; i++) {
             columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
         }
-        ScanPredicate[] predicates = new ScanPredicate[0];
-        if (params.containsKey("push_down_predicates")) {
-            long predicatesAddress = 
Long.parseLong(params.get("push_down_predicates"));
-            if (predicatesAddress != 0) {
-                predicates = 
ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
-                LOG.info("MockJniScanner gets pushed-down predicates:  " + 
ScanPredicate.dump(predicates));
-            }
-        }
-        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+        initTableInfo(columnTypes, requiredFields, batchSize);
     }
 
     @Override
diff --git 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index a87446b14b7..6a441a69293 100644
--- 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++ 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -19,7 +19,6 @@ package org.apache.doris.maxcompute;
 
 import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ScanPredicate;
 
 import com.aliyun.odps.Column;
 import com.aliyun.odps.OdpsType;
@@ -99,15 +98,7 @@ public class MaxComputeJniScanner extends JniScanner {
         for (int i = 0; i < types.length; i++) {
             columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
         }
-        ScanPredicate[] predicates = new ScanPredicate[0];
-        if (params.containsKey("push_down_predicates")) {
-            long predicatesAddress = 
Long.parseLong(params.get("push_down_predicates"));
-            if (predicatesAddress != 0) {
-                predicates = 
ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
-                LOG.info("MaxComputeJniScanner gets pushed-down predicates:  " 
+ ScanPredicate.dump(predicates));
-            }
-        }
-        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+        initTableInfo(columnTypes, requiredFields, batchSize);
     }
 
     public void refreshTableScan() {
@@ -133,9 +124,8 @@ public class MaxComputeJniScanner extends JniScanner {
     }
 
     @Override
-    protected void initTableInfo(ColumnType[] requiredTypes, String[] 
requiredFields, ScanPredicate[] predicates,
-                                 int batchSize) {
-        super.initTableInfo(requiredTypes, requiredFields, predicates, 
batchSize);
+    protected void initTableInfo(ColumnType[] requiredTypes, String[] 
requiredFields, int batchSize) {
+        super.initTableInfo(requiredTypes, requiredFields, batchSize);
         readColumns = new ArrayList<>();
         readColumnsToId = new HashMap<>();
         for (int i = 0; i < fields.length; i++) {
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 69ec49e3364..ad45c729190 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -19,7 +19,6 @@ package org.apache.doris.paimon;
 
 import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ScanPredicate;
 import org.apache.doris.common.jni.vec.TableSchema;
 import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
 import org.apache.doris.paimon.PaimonTableCache.TableExt;
@@ -82,7 +81,7 @@ public class PaimonJniScanner extends JniScanner {
         dbId = Long.parseLong(params.get("db_id"));
         tblId = Long.parseLong(params.get("tbl_id"));
         lastUpdateTime = Long.parseLong(params.get("last_update_time"));
-        initTableInfo(columnTypes, requiredFields, new ScanPredicate[0], 
batchSize);
+        initTableInfo(columnTypes, requiredFields, batchSize);
         paimonOptionParams = params.entrySet().stream()
                 .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
                 .collect(Collectors


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to