This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8c9fcf0a630 [fix](jni) remove 'push_down_predicates' and fix BE crash with decimal predicate (#32253) (#32599) 8c9fcf0a630 is described below commit 8c9fcf0a6309fa993e9e235dd6709c83872b895e Author: Mingyu Chen <morning...@163.com> AuthorDate: Thu Mar 21 13:48:51 2024 +0800 [fix](jni) remove 'push_down_predicates' and fix BE crash with decimal predicate (#32253) (#32599) --- be/src/vec/exec/jni_connector.cpp | 17 ++++++++++------- be/src/vec/exec/jni_connector.h | 3 +++ .../main/java/org/apache/doris/avro/AvroJNIScanner.java | 3 +-- .../main/java/org/apache/doris/hudi/HudiJniScanner.java | 16 +--------------- .../java/org/apache/doris/common/jni/JniScanner.java | 12 +++++++++--- .../org/apache/doris/common/jni/MockJniScanner.java | 11 +---------- .../apache/doris/maxcompute/MaxComputeJniScanner.java | 16 +++------------- .../java/org/apache/doris/paimon/PaimonJniScanner.java | 3 +-- 8 files changed, 29 insertions(+), 52 deletions(-) diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 00c0dcd8c7b..b802ff2431c 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -102,13 +102,16 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { Status JniConnector::init( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { - _generate_predicates(colname_to_value_range); - if (_predicates_length != 0 && _predicates != nullptr) { - int64_t predicates_address = (int64_t)_predicates.get(); - // We can call org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the - // serialized predicates in java side. - _scanner_params.emplace("push_down_predicates", std::to_string(predicates_address)); - } + // TODO: This logic need to be changed. + // See the comment of "predicates" field in JniScanner.java + + // _generate_predicates(colname_to_value_range); + // if (_predicates_length != 0 && _predicates != nullptr) { + // int64_t predicates_address = (int64_t)_predicates.get(); + // // We can call org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the + // // serialized predicates in java side. + // _scanner_params.emplace("push_down_predicates", std::to_string(predicates_address)); + // } return Status::OK(); } diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index ae3711f663a..22e33f01053 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -164,6 +164,9 @@ public: char_ptr += s->size; } } else { + // FIXME: it can not handle decimal type correctly. + // but this logic is deprecated and not used. + // so may be deleted or fixed later. for (const CppType* v : values) { int type_len = sizeof(CppType); *reinterpret_cast<int*>(char_ptr) = type_len; diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java index 17a185d03ae..dc845f43cb8 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -19,7 +19,6 @@ package org.apache.doris.avro; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.TableSchema; import org.apache.doris.thrift.TFileType; @@ -173,7 +172,7 @@ public class AvroJNIScanner extends JniScanner { try { initAvroFileContext(); initFieldInspector(); - initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize); + initTableInfo(requiredTypes, requiredFields, fetchSize); } catch (Exception e) { LOG.warn("Failed to init avro scanner. ", e); throw new RuntimeException(e); diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index 932b53b9a7c..0a1b69fcfb0 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -20,7 +20,6 @@ package org.apache.doris.hudi; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopUGI; @@ -59,7 +58,6 @@ public class HudiJniScanner extends JniScanner { private final int fetchSize; private final String debugString; private final HoodieSplit split; - private final ScanPredicate[] predicates; private final ClassLoader classLoader; private long getRecordReaderTimeNs = 0; @@ -123,20 +121,8 @@ public class HudiJniScanner extends JniScanner { .collect(Collectors.joining("\n")); try { this.classLoader = this.getClass().getClassLoader(); - String predicatesAddressString = params.remove("push_down_predicates"); this.fetchSize = fetchSize; this.split = new HoodieSplit(params); - if (predicatesAddressString == null) { - predicates = new ScanPredicate[0]; - } else { - long predicatesAddress = Long.parseLong(predicatesAddressString); - if (predicatesAddress != 0) { - predicates = ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes()); - LOG.info("HudiJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); - } else { - predicates = new ScanPredicate[0]; - } - } } catch (Exception e) { LOG.error("Failed to initialize hudi scanner, split params:\n" + debugString, e); throw e; @@ -147,7 +133,7 @@ public class HudiJniScanner extends JniScanner { public void open() throws IOException { Future<?> avroFuture = avroReadPool.submit(() -> { Thread.currentThread().setContextClassLoader(classLoader); - initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize); + initTableInfo(split.requiredTypes(), split.requiredFields(), fetchSize); long startTime = System.nanoTime(); // RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck, // so use another process to kill this stuck process. diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java index 51ed837813c..cb191f4b038 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java @@ -33,6 +33,14 @@ public abstract class JniScanner { protected VectorTable vectorTable; protected String[] fields; protected ColumnType[] types; + @Deprecated + // This predicate is from BE, but no used. + // TODO: actually, we can generate the predicate for JNI scanner in FE's planner, + // then serialize it to BE, and BE pass it to JNI scanner directly. + // NO need to use this intermediate expression, because each JNI scanner has its + // own predicate expression format. + // For example, Paimon use "PaimonScannerUtils.decodeStringToObject(paimonPredicate)" + // to deserialize the predicate string to PaimonPredicate object. protected ScanPredicate[] predicates; protected int batchSize; @@ -50,11 +58,9 @@ public abstract class JniScanner { throw new UnsupportedOperationException(); } - protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates, - int batchSize) { + protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, int batchSize) { this.types = requiredTypes; this.fields = requiredFields; - this.predicates = predicates; this.batchSize = batchSize; } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java index bc7561e2a23..000f536e915 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java @@ -20,7 +20,6 @@ package org.apache.doris.common.jni; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ColumnValue; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.log4j.Logger; @@ -187,15 +186,7 @@ public class MockJniScanner extends JniScanner { for (int i = 0; i < types.length; i++) { columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]); } - ScanPredicate[] predicates = new ScanPredicate[0]; - if (params.containsKey("push_down_predicates")) { - long predicatesAddress = Long.parseLong(params.get("push_down_predicates")); - if (predicatesAddress != 0) { - predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes); - LOG.info("MockJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); - } - } - initTableInfo(columnTypes, requiredFields, predicates, batchSize); + initTableInfo(columnTypes, requiredFields, batchSize); } @Override diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index a87446b14b7..6a441a69293 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -19,7 +19,6 @@ package org.apache.doris.maxcompute; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ScanPredicate; import com.aliyun.odps.Column; import com.aliyun.odps.OdpsType; @@ -99,15 +98,7 @@ public class MaxComputeJniScanner extends JniScanner { for (int i = 0; i < types.length; i++) { columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]); } - ScanPredicate[] predicates = new ScanPredicate[0]; - if (params.containsKey("push_down_predicates")) { - long predicatesAddress = Long.parseLong(params.get("push_down_predicates")); - if (predicatesAddress != 0) { - predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes); - LOG.info("MaxComputeJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); - } - } - initTableInfo(columnTypes, requiredFields, predicates, batchSize); + initTableInfo(columnTypes, requiredFields, batchSize); } public void refreshTableScan() { @@ -133,9 +124,8 @@ public class MaxComputeJniScanner extends JniScanner { } @Override - protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates, - int batchSize) { - super.initTableInfo(requiredTypes, requiredFields, predicates, batchSize); + protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, int batchSize) { + super.initTableInfo(requiredTypes, requiredFields, batchSize); readColumns = new ArrayList<>(); readColumnsToId = new HashMap<>(); for (int i = 0; i < fields.length; i++) { diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 69ec49e3364..ad45c729190 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -19,7 +19,6 @@ package org.apache.doris.paimon; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.TableSchema; import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; import org.apache.doris.paimon.PaimonTableCache.TableExt; @@ -82,7 +81,7 @@ public class PaimonJniScanner extends JniScanner { dbId = Long.parseLong(params.get("db_id")); tblId = Long.parseLong(params.get("tbl_id")); lastUpdateTime = Long.parseLong(params.get("last_update_time")); - initTableInfo(columnTypes, requiredFields, new ScanPredicate[0], batchSize); + initTableInfo(columnTypes, requiredFields, batchSize); paimonOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX)) .collect(Collectors --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org