This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3fe708cf56c3edcafa36f9e489e1d849e0fdf75a
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Fri Dec 16 09:41:43 2022 +0800

    [config](load) enable new load scan node by default (#14808)
    
    Set FE `enable_new_load_scan_node` to true by default.
    So that all load tasks(broker load, stream load, routine load, insert into) 
will use FileScanNode instead of BrokerScanNode
    to read data
    
    1. Support loading parquet file in stream load with new load scan node.
    2. Fix bug that new parquet reader can not read column without logical or 
converted type.
    3. Change jsonb parser function to "jsonb_parse_error_to_null"
        So that if the input string is not a valid json string, it will return 
null for jsonb column in load task.
---
 be/src/http/action/stream_load.cpp                 |  1 +
 be/src/util/jsonb_parser.h                         | 25 +++++---
 be/src/vec/exec/format/parquet/schema_desc.cpp     | 14 +++--
 be/src/vec/exec/scan/vfile_scanner.cpp             |  6 +-
 .../org/apache/doris/analysis/DataDescription.java | 27 ++++++--
 .../main/java/org/apache/doris/common/Config.java  |  2 +-
 .../org/apache/doris/load/BrokerFileGroup.java     |  2 +
 .../org/apache/doris/planner/LoadScanNode.java     |  5 +-
 .../apache/doris/planner/StreamLoadPlanner.java    | 14 ++++-
 .../planner/external/ExternalFileScanNode.java     | 21 ++++++-
 .../doris/planner/external/FileGroupInfo.java      | 15 +++++
 .../doris/planner/external/LoadScanProvider.java   |  3 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  3 +-
 .../java/org/apache/doris/task/LoadTaskInfo.java   |  5 ++
 .../java/org/apache/doris/task/StreamLoadTask.java | 11 ++++
 .../doris/planner/StreamLoadPlannerTest.java       | 71 ----------------------
 .../apache/doris/utframe/TestWithFeService.java    |  8 ++-
 gensrc/thrift/FrontendService.thrift               |  1 +
 .../table_valued_function/test_hdfs_tvf.groovy     |  2 -
 .../suites/export_p0/test_outfile_parquet.groovy   |  2 +-
 .../external_catalog_p0/hive/test_hive_orc.groovy  | 27 --------
 .../hive/test_hive_other.groovy                    | 25 --------
 .../hive/test_hive_parquet.groovy                  | 28 ---------
 .../jsonb_p0/test_jsonb_load_and_function.groovy   | 26 +++++++-
 .../test_jsonb_load_unique_key_and_function.groovy |  2 +
 .../load_p0/broker_load/test_array_load.groovy     |  8 ---
 .../load_p0/broker_load/test_broker_load.groovy    | 21 -------
 ...n_column_exclude_schema_without_jsonpath.groovy |  2 -
 .../stream_load/load_json_null_to_nullable.groovy  |  6 --
 .../stream_load/load_json_with_jsonpath.groovy     |  7 ---
 .../load_p0/stream_load/test_hdfs_json_load.groovy |  6 --
 .../load_p0/stream_load/test_json_load.groovy      | 13 ----
 .../stream_load/test_txt_special_delimiter.groovy  |  7 ---
 .../test_streamload_perfomance.groovy              |  2 +-
 .../multi_catalog_query/hive_catalog_orc.groovy    | 25 --------
 .../hive_catalog_parquet.groovy                    | 25 --------
 36 files changed, 164 insertions(+), 304 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 69bfeabe48..b2acc0cedb 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -408,6 +408,7 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req, StreamLoadContext*
         RETURN_IF_ERROR(file_sink->open());
         request.__isset.path = true;
         request.fileType = TFileType::FILE_LOCAL;
+        request.__set_file_size(ctx->body_bytes);
         ctx->body_sink = file_sink;
     }
     if (!http_req->header(HTTP_COLUMNS).empty()) {
diff --git a/be/src/util/jsonb_parser.h b/be/src/util/jsonb_parser.h
index c050fd305c..f4711f9a62 100644
--- a/be/src/util/jsonb_parser.h
+++ b/be/src/util/jsonb_parser.h
@@ -65,6 +65,7 @@
 #include "jsonb_document.h"
 #include "jsonb_error.h"
 #include "jsonb_writer.h"
+#include "string_parser.hpp"
 
 namespace doris {
 
@@ -894,8 +895,12 @@ private:
         }
 
         *pbuf = 0; // set null-terminator
-        int64_t val = strtol(num_buf_, NULL, 10);
-        if (errno == ERANGE) {
+        StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
+        int64_t val =
+                StringParser::string_to_int<int64_t>(num_buf_, pbuf - 
num_buf_, &parse_result);
+        if (parse_result != StringParser::PARSE_SUCCESS) {
+            VLOG_ROW << "debug string_to_int error for " << num_buf_ << " 
val=" << val
+                     << " parse_result=" << parse_result;
             err_ = JsonbErrType::E_DECIMAL_OVERFLOW;
             return false;
         }
@@ -950,7 +955,7 @@ private:
         }
 
         *pbuf = 0; // set null-terminator
-        return internConvertBufferToDouble();
+        return internConvertBufferToDouble(num_buf_, pbuf - num_buf_);
     }
 
     // parse the exponent part of a double number
@@ -990,15 +995,17 @@ private:
         }
 
         *pbuf = 0; // set null-terminator
-        return internConvertBufferToDouble();
+        return internConvertBufferToDouble(num_buf_, pbuf - num_buf_);
     }
 
     // call system function to parse double to string
-    bool internConvertBufferToDouble() {
-        double val = strtod(num_buf_, NULL);
-
-        if (errno == ERANGE) {
-            err_ = JsonbErrType::E_DOUBLE_OVERFLOW;
+    bool internConvertBufferToDouble(char* num_buf_, int len) {
+        StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
+        double val = StringParser::string_to_float<double>(num_buf_, len, 
&parse_result);
+        if (parse_result != StringParser::PARSE_SUCCESS) {
+            VLOG_ROW << "debug string_to_float error for " << num_buf_ << " 
val=" << val
+                     << " parse_result=" << parse_result;
+            err_ = JsonbErrType::E_DECIMAL_OVERFLOW;
             return false;
         }
 
diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp 
b/be/src/vec/exec/format/parquet/schema_desc.cpp
index 6d07886e89..2af4d40ea2 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -168,20 +168,24 @@ TypeDescriptor FieldDescriptor::get_doris_type(const 
tparquet::SchemaElement& ph
         switch (physical_schema.type) {
         case tparquet::Type::BOOLEAN:
             type.type = TYPE_BOOLEAN;
-            return type;
+            break;
         case tparquet::Type::INT32:
             type.type = TYPE_INT;
-            return type;
+            break;
         case tparquet::Type::INT64:
         case tparquet::Type::INT96:
             type.type = TYPE_BIGINT;
-            return type;
+            break;
         case tparquet::Type::FLOAT:
             type.type = TYPE_FLOAT;
-            return type;
+            break;
         case tparquet::Type::DOUBLE:
             type.type = TYPE_DOUBLE;
-            return type;
+            break;
+        case tparquet::Type::BYTE_ARRAY:
+        case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
+            type.type = TYPE_STRING;
+            break;
         default:
             break;
         }
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index b478d5c96e..6d170a6130 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -209,7 +209,11 @@ Status VFileScanner::_init_src_block(Block* block) {
             data_type = 
DataTypeFactory::instance().create_data_type(it->second, true);
         }
         if (data_type == nullptr) {
-            return Status::NotSupported(fmt::format("Not support arrow 
type:{}", slot->col_name()));
+            return Status::NotSupported(
+                    fmt::format("Not support data type:{} for column: {}",
+                                (it == _name_to_col_type.end() ? 
slot->type().debug_string()
+                                                               : 
it->second.debug_string()),
+                                slot->col_name()));
         }
         MutableColumnPtr data_column = data_type->create_column();
         _src_block.insert(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 79a8cf7840..b67574e743 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -222,9 +222,14 @@ public class DataDescription {
     public DataDescription(String tableName, LoadTaskInfo taskInfo) {
         this.tableName = tableName;
         this.partitionNames = taskInfo.getPartitions();
-        // Add a dummy path to just make analyze() happy.
-        // Stream load does not need this field.
-        this.filePaths = Lists.newArrayList("dummy");
+
+        if (!Strings.isNullOrEmpty(taskInfo.getPath())) {
+            this.filePaths = Lists.newArrayList(taskInfo.getPath());
+        } else {
+            // Add a dummy path to just make analyze() happy.
+            this.filePaths = Lists.newArrayList("dummy");
+        }
+
         this.fileFieldNames = taskInfo.getColumnExprDescs().getFileColNames();
         this.columnSeparator = taskInfo.getColumnSeparator();
         this.lineDelimiter = taskInfo.getLineDelimiter();
@@ -259,7 +264,20 @@ public class DataDescription {
                 // the compress type is saved in "compressType"
                 this.fileFormat = "csv";
             } else {
-                this.fileFormat = "json";
+                switch (type) {
+                    case FORMAT_ORC:
+                        this.fileFormat = "orc";
+                        break;
+                    case FORMAT_PARQUET:
+                        this.fileFormat = "parquet";
+                        break;
+                    case FORMAT_JSON:
+                        this.fileFormat = "json";
+                        break;
+                    default:
+                        this.fileFormat = "unknown";
+                        break;
+                }
             }
         }
         // get compress type
@@ -1019,3 +1037,4 @@ public class DataDescription {
         return toSql();
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index c424350946..2555d8efaa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1755,7 +1755,7 @@ public class Config extends ConfigBase {
      * Temp config, should be removed when new file scan node is ready.
      */
     @ConfField(mutable = true)
-    public static boolean enable_new_load_scan_node = false;
+    public static boolean enable_new_load_scan_node = true;
 
     /**
      * Max data version of backends serialize block.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index a89f48e52b..497f0c2d23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -155,6 +155,7 @@ public class BrokerFileGroup implements Writable {
         this.deleteCondition = dataDescription.getDeleteCondition();
         this.mergeType = dataDescription.getMergeType();
         this.sequenceCol = dataDescription.getSequenceCol();
+        this.filePaths = dataDescription.getFilePaths();
     }
 
     // NOTE: DBLock will be held
@@ -598,3 +599,4 @@ public class BrokerFileGroup implements Writable {
         return fileGroup;
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
index 0f8eb77ffb..bf4c66dccb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
@@ -207,6 +207,8 @@ public abstract class LoadScanNode extends ScanNode {
                 expr.analyze(analyzer);
             }
 
+            // for jsonb type, use jsonb_parse_xxx to parse src string to 
jsonb.
+            // and if input string is not a valid json string, return null.
             PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType();
             PrimitiveType srcType = expr.getType().getPrimitiveType();
             if (dstType == PrimitiveType.JSONB
@@ -217,7 +219,7 @@ public abstract class LoadScanNode extends ScanNode {
                 if (destSlotDesc.getIsNullable() || expr.isNullable()) {
                     nullable = "nullable";
                 }
-                String name = "jsonb_parse_" + nullable + "_error_to_invalid";
+                String name = "jsonb_parse_" + nullable + "_error_to_null";
                 expr = new FunctionCallExpr(name, args);
                 expr.analyze(analyzer);
             } else {
@@ -251,3 +253,4 @@ public abstract class LoadScanNode extends ScanNode {
         planNode.setBrokerScanNode(brokerScanNode);
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index b55bb8f402..81e9009f65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -52,6 +52,7 @@ import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.PaloInternalServiceVersion;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TLoadErrorHubInfo;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPlanFragmentExecParams;
@@ -180,9 +181,15 @@ public class StreamLoadPlanner {
             fileGroup.parse(db, dataDescription);
             // 2. create dummy file status
             TBrokerFileStatus fileStatus = new TBrokerFileStatus();
-            fileStatus.setPath("");
-            fileStatus.setIsDir(false);
-            fileStatus.setSize(-1); // must set to -1, means stream.
+            if (taskInfo.getFileType() == TFileType.FILE_LOCAL) {
+                fileStatus.setPath(taskInfo.getPath());
+                fileStatus.setIsDir(false);
+                fileStatus.setSize(taskInfo.getFileSize()); // must set to -1, 
means stream.
+            } else {
+                fileStatus.setPath("");
+                fileStatus.setIsDir(false);
+                fileStatus.setSize(-1); // must set to -1, means stream.
+            }
             fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, 
BrokerDesc.createForStreamLoad(),
                     fileGroup, fileStatus, taskInfo.isStrictMode(), 
taskInfo.getFileType());
             scanNode = fileScanNode;
@@ -323,3 +330,4 @@ public class StreamLoadPlanner {
         return null;
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index ab0b25b08d..c42c97041b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -456,7 +456,25 @@ public class ExternalFileScanNode extends ExternalScanNode 
{
                 expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, 
expr, new IntLiteral(-1));
                 expr.analyze(analyzer);
             }
-            expr = castToSlot(destSlotDesc, expr);
+
+            // for jsonb type, use jsonb_parse_xxx to parse src string to 
jsonb.
+            // and if input string is not a valid json string, return null.
+            PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType();
+            PrimitiveType srcType = expr.getType().getPrimitiveType();
+            if (dstType == PrimitiveType.JSONB
+                    && (srcType == PrimitiveType.VARCHAR || srcType == 
PrimitiveType.STRING)) {
+                List<Expr> args = Lists.newArrayList();
+                args.add(expr);
+                String nullable = "notnull";
+                if (destSlotDesc.getIsNullable() || expr.isNullable()) {
+                    nullable = "nullable";
+                }
+                String name = "jsonb_parse_" + nullable + "_error_to_null";
+                expr = new FunctionCallExpr(name, args);
+                expr.analyze(analyzer);
+            } else {
+                expr = castToSlot(destSlotDesc, expr);
+            }
             params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), 
expr.treeToThrift());
         }
         params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans);
@@ -550,3 +568,4 @@ public class ExternalFileScanNode extends ExternalScanNode {
 }
 
 
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
index eb99c88ed9..f3cb9a19f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
@@ -44,6 +44,7 @@ import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -140,6 +141,10 @@ public class FileGroupInfo {
         return loadParallelism;
     }
 
+    public TFileType getFileType() {
+        return fileType;
+    }
+
     public String getExplainString(String prefix) {
         StringBuilder sb = new StringBuilder();
         sb.append("file scan\n");
@@ -303,6 +308,15 @@ public class FileGroupInfo {
             rangeDesc.setSize(rangeBytes);
             rangeDesc.setColumnsFromPath(columnsFromPath);
         } else {
+            // for stream load
+            if (getFileType() == TFileType.FILE_LOCAL) {
+                // when loading parquet via stream, there will be a local file 
saved on BE
+                // so to read it as a local file.
+                Preconditions.checkState(fileGroup.getFilePaths().size() == 1);
+                rangeDesc.setPath(fileGroup.getFilePaths().get(0));
+                rangeDesc.setStartOffset(0);
+                rangeDesc.setSize(fileStatus.size);
+            }
             rangeDesc.setLoadId(loadId);
             rangeDesc.setSize(fileStatus.size);
         }
@@ -310,3 +324,4 @@ public class FileGroupInfo {
     }
 }
 
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 7502fb0749..8f7ea2bb6e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -111,7 +111,7 @@ public class LoadScanProvider implements FileScanProviderIf 
{
         TFileAttributes fileAttributes = new TFileAttributes();
         setFileAttributes(ctx.fileGroup, fileAttributes);
         params.setFileAttributes(fileAttributes);
-        params.setFileType(fileGroupInfo.getBrokerDesc().getFileType());
+        params.setFileType(fileGroupInfo.getFileType());
         ctx.params = params;
 
         initColumns(ctx, analyzer);
@@ -252,3 +252,4 @@ public class LoadScanProvider implements FileScanProviderIf 
{
         return fileGroupInfo.getTargetTable();
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index c7ab68ab0c..e4d0936fd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -899,7 +899,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         } catch (Throwable e) {
             LOG.warn("catch unknown result.", e);
             status.setStatusCode(TStatusCode.INTERNAL_ERROR);
-            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + 
Strings.nullToEmpty(e.getMessage()));
             return result;
         }
         return result;
@@ -1230,3 +1230,4 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index dd97602003..642597246f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -71,6 +71,10 @@ public interface LoadTaskInfo {
 
     String getPath();
 
+    default long getFileSize() {
+        return 0;
+    }
+
     double getMaxFilterRatio();
 
     ImportColumnDescs getColumnExprDescs();
@@ -118,3 +122,4 @@ public interface LoadTaskInfo {
         }
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 300a93ca19..59393b7d01 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -68,6 +68,7 @@ public class StreamLoadTask implements LoadTaskInfo {
     private Separator lineDelimiter;
     private PartitionNames partitions;
     private String path;
+    private long fileSize = 0;
     private boolean negative;
     private boolean strictMode = false; // default is false
     private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
@@ -159,6 +160,11 @@ public class StreamLoadTask implements LoadTaskInfo {
         return path;
     }
 
+    @Override
+    public long getFileSize() {
+        return fileSize;
+    }
+
     public boolean getNegative() {
         return negative;
     }
@@ -234,6 +240,7 @@ public class StreamLoadTask implements LoadTaskInfo {
         return !Strings.isNullOrEmpty(sequenceCol);
     }
 
+
     @Override
     public String getSequenceCol() {
         return sequenceCol;
@@ -249,6 +256,9 @@ public class StreamLoadTask implements LoadTaskInfo {
                 request.getFileType(), request.getFormatType(),
                 request.getCompressType());
         streamLoadTask.setOptionalFromTSLPutRequest(request);
+        if (request.isSetFileSize()) {
+            streamLoadTask.fileSize = request.getFileSize();
+        }
         return streamLoadTask;
     }
 
@@ -416,3 +426,4 @@ public class StreamLoadTask implements LoadTaskInfo {
         return maxFilterRatio;
     }
 }
+
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java
index d8ddcfe909..870dbbeb55 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java
@@ -17,90 +17,19 @@
 
 package org.apache.doris.planner;
 
-import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.CompoundPredicate;
 import org.apache.doris.analysis.ImportColumnsStmt;
 import org.apache.doris.analysis.ImportWhereStmt;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.SqlParserUtils;
-import org.apache.doris.task.StreamLoadTask;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.TStreamLoadPutRequest;
-import org.apache.doris.thrift.TUniqueId;
 
-import com.google.common.collect.Lists;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mocked;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.StringReader;
-import java.util.Arrays;
-import java.util.List;
 
 public class StreamLoadPlannerTest {
-    @Injectable
-    Database db;
-
-    @Injectable
-    OlapTable destTable;
-
-    @Mocked
-    StreamLoadScanNode scanNode;
-
-    @Mocked
-    OlapTableSink sink;
-
-    @Mocked
-    Partition partition;
-
-    @Test
-    public void testNormalPlan() throws UserException {
-        List<Column> columns = Lists.newArrayList();
-        Column c1 = new Column("c1", PrimitiveType.BIGINT, false);
-        columns.add(c1);
-        Column c2 = new Column("c2", PrimitiveType.BIGINT, true);
-        columns.add(c2);
-        new Expectations() {
-            {
-                destTable.getBaseSchema();
-                minTimes = 0;
-                result = columns;
-                destTable.getPartitions();
-                minTimes = 0;
-                result = Arrays.asList(partition);
-                scanNode.init((Analyzer) any);
-                minTimes = 0;
-                scanNode.getChildren();
-                minTimes = 0;
-                result = Lists.newArrayList();
-                scanNode.getId();
-                minTimes = 0;
-                result = new PlanNodeId(5);
-                partition.getId();
-                minTimes = 0;
-                result = 0;
-            }
-        };
-        TStreamLoadPutRequest request = new TStreamLoadPutRequest();
-        request.setTxnId(1);
-        request.setLoadId(new TUniqueId(2, 3));
-        request.setFileType(TFileType.FILE_STREAM);
-        request.setFormatType(TFileFormatType.FORMAT_CSV_PLAIN);
-        StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(request);
-        StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, 
streamLoadTask);
-        planner.plan(streamLoadTask.getId());
-    }
-
     @Test
     public void testParseStmt() throws Exception {
         String sql = new String("COLUMNS (k1, k2, k3=abc(), 
k4=default_value())");
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 90b0e4658d..979e143bd9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -96,6 +96,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Comparator;
+import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -492,7 +493,12 @@ public abstract class TestWithFeService {
     }
 
     public void createTable(String sql) throws Exception {
-        createTables(sql);
+        try {
+            createTables(sql);
+        } catch (ConcurrentModificationException e) {
+            e.printStackTrace();
+            throw e;
+        }
     }
 
     public void dropTable(String table, boolean force) throws Exception {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 9cf6ffb6ee..47de9fcdc3 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -544,6 +544,7 @@ struct TStreamLoadPutRequest {
     38: optional string header_type
     39: optional string hidden_columns
     40: optional PlanNodes.TFileCompressType compress_type
+    41: optional i64 file_size // only for stream load with parquet or orc
 }
 
 struct TStreamLoadPutResult {
diff --git 
a/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy
 
b/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy
index 65996cca81..07b8363b0b 100644
--- 
a/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy
+++ 
b/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy
@@ -26,7 +26,6 @@ suite("test_hdfs_tvf") {
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         try {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"true");"""
 
             // test csv foramt
             uri = "${defaultFS}" + 
"/user/doris/preinstalled_data/csv_format_test/all_types.csv"
@@ -193,7 +192,6 @@ suite("test_hdfs_tvf") {
             assertTrue(result2[0][0] == 5, "Insert should update 12 rows")
             qt_insert """ select * from test_hdfs_tvf order by id; """
         } finally {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
         }
     }
 }
diff --git a/regression-test/suites/export_p0/test_outfile_parquet.groovy 
b/regression-test/suites/export_p0/test_outfile_parquet.groovy
index 8b1944d2fb..9be0c8fdc4 100644
--- a/regression-test/suites/export_p0/test_outfile_parquet.groovy
+++ b/regression-test/suites/export_p0/test_outfile_parquet.groovy
@@ -22,7 +22,7 @@ import java.nio.file.Files
 import java.nio.file.Paths
 
 suite("test_outfile_parquet") {
-    def dbName = "test_query_db"
+    def dbName = "test_outfile_parquet"
     sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
     sql "USE $dbName"
     StringBuilder strBuilder = new StringBuilder()
diff --git 
a/regression-test/suites/external_catalog_p0/hive/test_hive_orc.groovy 
b/regression-test/suites/external_catalog_p0/hive/test_hive_orc.groovy
index 7e1f8b78fd..2f8d610200 100644
--- a/regression-test/suites/external_catalog_p0/hive/test_hive_orc.groovy
+++ b/regression-test/suites/external_catalog_p0/hive/test_hive_orc.groovy
@@ -66,37 +66,12 @@ suite("test_hive_orc", "all_types") {
         qt_only_partition_col """select count(p1_col), count(p2_col) from 
orc_all_types;"""
     }
 
-    def set_be_config = { flag ->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            StringBuilder setConfigCommand = new StringBuilder();
-            setConfigCommand.append("curl -X POST http://";)
-            setConfigCommand.append(backend[2])
-            setConfigCommand.append(":")
-            setConfigCommand.append(backend[5])
-            setConfigCommand.append("/api/update_config?")
-            String command1 = setConfigCommand.toString() + 
"enable_new_load_scan_node=$flag"
-            logger.info(command1)
-            String command2 = setConfigCommand.toString() + 
"enable_new_file_scanner=$flag"
-            logger.info(command2)
-            def process1 = command1.execute()
-            int code = process1.waitFor()
-            assertEquals(code, 0)
-            def process2 = command2.execute()
-            code = process1.waitFor()
-            assertEquals(code, 0)
-        }
-    }
-
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         try {
             String hms_port = context.config.otherConfigs.get("hms_port")
             String catalog_name = "hive_test_orc"
             sql """admin set frontend config ("enable_multi_catalog" = 
"true")"""
-            sql """admin set frontend config ("enable_new_load_scan_node" = 
"true");"""
-            set_be_config.call('true')
             sql """drop catalog if exists ${catalog_name}"""
             sql """
             create catalog if not exists ${catalog_name} properties (
@@ -114,8 +89,6 @@ suite("test_hive_orc", "all_types") {
             only_partition_col()
 
         } finally {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
-            set_be_config.call('false')
         }
     }
 }
diff --git 
a/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy 
b/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
index f8401d70e9..71c184b872 100644
--- a/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
+++ b/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
@@ -50,36 +50,11 @@ suite("test_hive_other", "p0") {
     }
 
 
-    def set_be_config = { ->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            // No need to set this config anymore, but leave this code sample 
here
-            // StringBuilder setConfigCommand = new StringBuilder();
-            // setConfigCommand.append("curl -X POST http://";)
-            // setConfigCommand.append(backend[2])
-            // setConfigCommand.append(":")
-            // setConfigCommand.append(backend[5])
-            // setConfigCommand.append("/api/update_config?")
-            // String command1 = setConfigCommand.toString() + 
"enable_new_load_scan_node=true"
-            // logger.info(command1)
-            // String command2 = setConfigCommand.toString() + 
"enable_new_file_scanner=true"
-            // logger.info(command2)
-            // def process1 = command1.execute()
-            // int code = process1.waitFor()
-            // assertEquals(code, 0)
-            // def process2 = command2.execute()
-            // code = process1.waitFor()
-            // assertEquals(code, 0)
-        }
-    }
-
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String hms_port = context.config.otherConfigs.get("hms_port")
         String hdfs_port = context.config.otherConfigs.get("hdfs_port")
         String catalog_name = "hive_test_other"
-        set_be_config.call()
 
         sql """admin set frontend config ("enable_multi_catalog" = "true")"""
         sql """drop catalog if exists ${catalog_name}"""
diff --git 
a/regression-test/suites/external_catalog_p0/hive/test_hive_parquet.groovy 
b/regression-test/suites/external_catalog_p0/hive/test_hive_parquet.groovy
index 8cb89baec5..d644699f00 100644
--- a/regression-test/suites/external_catalog_p0/hive/test_hive_parquet.groovy
+++ b/regression-test/suites/external_catalog_p0/hive/test_hive_parquet.groovy
@@ -139,38 +139,12 @@ suite("test_hive_parquet", "p0") {
     """
     }
 
-
-    def set_be_config = { flag ->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            StringBuilder setConfigCommand = new StringBuilder();
-            setConfigCommand.append("curl -X POST http://";)
-            setConfigCommand.append(backend[2])
-            setConfigCommand.append(":")
-            setConfigCommand.append(backend[5])
-            setConfigCommand.append("/api/update_config?")
-            String command1 = setConfigCommand.toString() + 
"enable_new_load_scan_node=$flag"
-            logger.info(command1)
-            String command2 = setConfigCommand.toString() + 
"enable_new_file_scanner=$flag"
-            logger.info(command2)
-            def process1 = command1.execute()
-            int code = process1.waitFor()
-            assertEquals(code, 0)
-            def process2 = command2.execute()
-            code = process1.waitFor()
-            assertEquals(code, 0)
-        }
-    }
-
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         try {
             String hms_port = context.config.otherConfigs.get("hms_port")
             String catalog_name = "hive_test_parquet"
             sql """admin set frontend config ("enable_multi_catalog" = 
"true")"""
-            sql """admin set frontend config ("enable_new_load_scan_node" = 
"true");"""
-            set_be_config.call('true')
             sql """drop catalog if exists ${catalog_name}"""
             sql """
             create catalog if not exists ${catalog_name} properties (
@@ -201,8 +175,6 @@ suite("test_hive_parquet", "p0") {
             q19()
             q20()
         } finally {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
-            set_be_config.call('false')
         }
     }
 }
diff --git 
a/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy 
b/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy
index b410251cc4..9c4abca487 100644
--- a/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy
+++ b/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
 suite("test_jsonb_load_and_function", "p0") {
     // define a sql table
     def testTable = "tbl_test_jsonb"
@@ -35,12 +37,12 @@ suite("test_jsonb_load_and_function", "p0") {
         """
 
     // load the jsonb data from csv file
-    // fail by default for invalid data rows
     streamLoad {
         table testTable
         
         file dataFile // import csv file
         time 10000 // limit inflight 10s
+        set 'strict_mode', 'true'
 
         // if declared a check callback, the default check condition will 
ignore.
         // So you must check all condition
@@ -50,12 +52,23 @@ suite("test_jsonb_load_and_function", "p0") {
             }
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
+
+            StringBuilder sb = new StringBuilder()
+            sb.append("curl -X GET " + json.ErrorURL)
+            String command = sb.toString()
+            def process = command.execute()
+            def code = process.waitFor()
+            def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())))
+            def out = process.getText()
+            log.info("error result: " + out)
+
             assertEquals("fail", json.Status.toLowerCase())
             assertEquals("too many filtered rows", json.Message)
             assertEquals(25, json.NumberTotalRows)
             assertEquals(18, json.NumberLoadedRows)
             assertEquals(7, json.NumberFilteredRows)
             assertTrue(json.LoadBytes > 0)
+            log.info("url: " + json.ErrorURL)
         }
     }
 
@@ -68,6 +81,7 @@ suite("test_jsonb_load_and_function", "p0") {
         set 'max_filter_ratio', '0.3'
         file dataFile // import csv file
         time 10000 // limit inflight 10s
+        set 'strict_mode', 'true'
 
         // if declared a check callback, the default check condition will 
ignore.
         // So you must check all condition
@@ -77,6 +91,16 @@ suite("test_jsonb_load_and_function", "p0") {
             }
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
+
+            StringBuilder sb = new StringBuilder()
+            sb.append("curl -X GET " + json.ErrorURL)
+            String command = sb.toString()
+            def process = command.execute()
+            def code = process.waitFor()
+            def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())))
+            def out = process.getText()
+            log.info("error result: " + out)
+
             assertEquals("success", json.Status.toLowerCase())
             assertEquals(25, json.NumberTotalRows)
             assertEquals(18, json.NumberLoadedRows)
diff --git 
a/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy
 
b/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy
index 4c3bd82e83..0789d48c4e 100644
--- 
a/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy
+++ 
b/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy
@@ -41,6 +41,7 @@ suite("test_jsonb_unique_load_and_function", "p0") {
         
         file dataFile // import csv file
         time 10000 // limit inflight 10s
+        set 'strict_mode', 'true'
 
         // if declared a check callback, the default check condition will 
ignore.
         // So you must check all condition
@@ -68,6 +69,7 @@ suite("test_jsonb_unique_load_and_function", "p0") {
         set 'max_filter_ratio', '0.3'
         file dataFile // import csv file
         time 10000 // limit inflight 10s
+        set 'strict_mode', 'true'
 
         // if declared a check callback, the default check condition will 
ignore.
         // So you must check all condition
diff --git a/regression-test/suites/load_p0/broker_load/test_array_load.groovy 
b/regression-test/suites/load_p0/broker_load/test_array_load.groovy
index 7f5d109952..271050f3ca 100644
--- a/regression-test/suites/load_p0/broker_load/test_array_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_array_load.groovy
@@ -202,13 +202,6 @@ suite("test_array_load", "load_p0") {
 
     try {
         for ( i in 0..1 ) {
-            // should be deleted after new_load_scan is ready
-            if (i == 1) {
-                sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" 
= "false");"""
-            } else {
-                sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" 
= "true");"""
-            }
-
             // case1: import array data in json format and enable vectorized 
engine
             try {
                 sql "DROP TABLE IF EXISTS ${testTable}"
@@ -280,7 +273,6 @@ suite("test_array_load", "load_p0") {
             }
         }
     } finally {
-        try_sql("""ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");""")
     }
 
 
diff --git a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy 
b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
index e1f16676dd..e67397e821 100644
--- a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
@@ -192,28 +192,8 @@ suite("test_broker_load", "p0") {
         logger.info("Submit load with lable: $uuid, table: $table, path: 
$path")
     }
 
-    def set_be_config = { flag->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            // No need to set this config anymore, but leave this code sample 
here
-            // StringBuilder setConfigCommand = new StringBuilder();
-            // setConfigCommand.append("curl -X POST http://";)
-            // setConfigCommand.append(backend[2])
-            // setConfigCommand.append(":")
-            // setConfigCommand.append(backend[5])
-            // setConfigCommand.append("/api/update_config?")
-            // String command1 = setConfigCommand.toString() + 
"enable_new_load_scan_node=$flag"
-            // logger.info(command1)
-            // def process1 = command1.execute()
-            // int code = process1.waitFor()
-            // assertEquals(code, 0)
-        }
-    }
-
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         def uuids = []
-        set_be_config.call('true')
         try {
             def i = 0
             for (String table in tables) {
@@ -258,7 +238,6 @@ suite("test_broker_load", "p0") {
             order_qt_parquet_s3_case9 """ select * from parquet_s3_case9"""
 
         } finally {
-            set_be_config.call('false')
             for (String table in tables) {
                 sql new 
File("""${context.file.parent}/ddl/${table}_drop.sql""").text
             }
diff --git 
a/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
 
b/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
index 760af3344e..d00316c37a 100644
--- 
a/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
@@ -51,8 +51,6 @@ 
suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
 
     def load_array_data = {new_json_reader_flag, table_name, strip_flag, 
read_flag, format_flag, exprs, json_paths, 
                             json_root, where_expr, fuzzy_flag, column_sep, 
file_name ->
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"${new_json_reader_flag}");"""
 
         // load the json data
         streamLoad {
diff --git 
a/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy 
b/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
index f934c038a2..be0baf42b8 100644
--- 
a/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
@@ -42,9 +42,6 @@ suite("test_load_json_null_to_nullable", "p0") {
 
     def load_array_data = {new_json_reader_flag, table_name, strip_flag, 
read_flag, format_flag, exprs, json_paths, 
                             json_root, where_expr, fuzzy_flag, column_sep, 
file_name ->
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"${new_json_reader_flag}");"""
-
         // load the json data
         streamLoad {
             table table_name
@@ -77,9 +74,6 @@ suite("test_load_json_null_to_nullable", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
-
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
     }
 
     def check_data_correct = {table_name ->
diff --git 
a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy 
b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
index 02ffd808e2..f48b41be79 100644
--- a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
+++ b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
@@ -42,10 +42,6 @@ suite("test_load_json_with_jsonpath", "p0") {
 
     def load_array_data = {new_json_reader_flag, table_name, strip_flag, 
read_flag, format_flag, exprs, json_paths,
                             json_root, where_expr, fuzzy_flag, column_sep, 
file_name ->
-        
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"${new_json_reader_flag}");"""
-
         // load the json data
         streamLoad {
             table table_name
@@ -78,9 +74,6 @@ suite("test_load_json_with_jsonpath", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
-
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
     }
 
     def check_data_correct = {table_name ->
diff --git 
a/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
index 5c79cf1b97..6565357e96 100644
--- a/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
@@ -44,9 +44,6 @@ suite("test_hdfs_json_load", "p0") {
 
     def load_from_hdfs1 = {new_json_reader_flag, strip_flag, fuzzy_flag, 
testTablex, label, fileName,
                             fsPath, hdfsUser, exprs, jsonpaths, json_root, 
columns_parameter, where ->
-        // should be delete after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"${new_json_reader_flag}");"""
-        
         def hdfsFilePath = 
"${fsPath}/user/doris/preinstalled_data/json_format_test/${fileName}"
         def result1= sql """
                         LOAD LABEL ${label} (
@@ -76,9 +73,6 @@ suite("test_hdfs_json_load", "p0") {
         assertTrue(result1.size() == 1)
         assertTrue(result1[0].size() == 1)
         assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
-
-        // should be delete after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
     }
 
     def check_load_result = {checklabel, testTablex ->
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index e066467e3e..513d3e14dd 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -116,8 +116,6 @@ suite("test_json_load", "p0") {
     
     def load_json_data = {new_json_reader_flag, label, strip_flag, read_flag, 
format_flag, exprs, json_paths, 
                         json_root, where_expr, fuzzy_flag, file_name, 
ignore_failure=false ->
-        // should be delete after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"${new_json_reader_flag}");"""
         
         // load the json data
         streamLoad {
@@ -150,9 +148,6 @@ suite("test_json_load", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
-
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
     }
     
     def load_from_hdfs1 = {testTablex, label, hdfsFilePath, format, 
brokerName, hdfsUser, hdfsPasswd ->
@@ -529,8 +524,6 @@ suite("test_json_load", "p0") {
     try {
         sql "DROP TABLE IF EXISTS ${testTable}"
         create_test_table3.call(testTable)
-        // should be delete after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
         // load the json data
         streamLoad {
             table "${testTable}"
@@ -557,16 +550,12 @@ suite("test_json_load", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
         sql "sync"
         qt_select13 "select * from ${testTable} order by id"
 
 
         sql "DROP TABLE IF EXISTS ${testTable}"
         create_test_table3.call(testTable)
-        // should be delete after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"true");"""
         // load the json data
         streamLoad {
             table "${testTable}"
@@ -593,8 +582,6 @@ suite("test_json_load", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
         sql "sync"
         qt_select13 "select * from ${testTable} order by id"
 
diff --git 
a/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy 
b/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy
index fff343078b..0e762f31d3 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy
@@ -31,13 +31,6 @@ suite("test_txt_special_delimiter", "p0") {
         PROPERTIES ("replication_allocation" = "tag.location.default: 1");
     """
     for ( i in 0..1 ) {
-        // should be deleted after new_load_scan is ready
-        if (i == 1) {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
-        } else {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"true");"""
-        }
-
         // test special_delimiter success
         streamLoad {
             table "${tableName}"
diff --git 
a/regression-test/suites/performance_p0/test_streamload_perfomance.groovy 
b/regression-test/suites/performance_p0/test_streamload_perfomance.groovy
index ef7e689e1d..adb0cafb87 100644
--- a/regression-test/suites/performance_p0/test_streamload_perfomance.groovy
+++ b/regression-test/suites/performance_p0/test_streamload_perfomance.groovy
@@ -37,7 +37,7 @@ suite("test_streamload_perfomance") {
 
         streamLoad {
             table tableName
-            time 5000
+            time 10000
             inputIterator rowIt
         }
     } finally {
diff --git 
a/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_orc.groovy
 
b/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_orc.groovy
index 13b12e0027..e2c2cf8b1e 100644
--- 
a/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_orc.groovy
+++ 
b/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_orc.groovy
@@ -797,35 +797,10 @@ order by
         """
     }
 
-    def set_be_config = { ->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            // No need to set this config anymore, but leave this code sample 
here
-            // StringBuilder setConfigCommand = new StringBuilder();
-            // setConfigCommand.append("curl -X POST http://";)
-            // setConfigCommand.append(backend[2])
-            // setConfigCommand.append(":")
-            // setConfigCommand.append(backend[5])
-            // setConfigCommand.append("/api/update_config?")
-            // String command1 = setConfigCommand.toString() + 
"enable_new_load_scan_node=true"
-            // logger.info(command1)
-            // String command2 = setConfigCommand.toString() + 
"enable_new_file_scanner=true"
-            // logger.info(command2)
-            // def process1 = command1.execute()
-            // int code = process1.waitFor()
-            // assertEquals(code, 0)
-            // def process2 = command2.execute()
-            // code = process1.waitFor()
-            // assertEquals(code, 0)
-        }
-    }
-
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String hms_port = context.config.otherConfigs.get("hms_port")
         String catalog_name = "test_catalog_hive_orc"
-        set_be_config.call()
 
         sql """admin set frontend config ("enable_multi_catalog" = "true")"""
         sql """drop catalog if exists ${catalog_name}"""
diff --git 
a/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_parquet.groovy
 
b/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_parquet.groovy
index ce36a181a4..01530b738a 100644
--- 
a/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_parquet.groovy
+++ 
b/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_parquet.groovy
@@ -797,35 +797,10 @@ order by
         """
     }
 
-    def set_be_config = { ->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            // No need to set this config anymore, but leave this code sample 
here
-            // StringBuilder setConfigCommand = new StringBuilder();
-            // setConfigCommand.append("curl -X POST http://";)
-            // setConfigCommand.append(backend[2])
-            // setConfigCommand.append(":")
-            // setConfigCommand.append(backend[5])
-            // setConfigCommand.append("/api/update_config?")
-            // String command1 = setConfigCommand.toString() + 
"enable_new_load_scan_node=true"
-            // logger.info(command1)
-            // String command2 = setConfigCommand.toString() + 
"enable_new_file_scanner=true"
-            // logger.info(command2)
-            // def process1 = command1.execute()
-            // int code = process1.waitFor()
-            // assertEquals(code, 0)
-            // def process2 = command2.execute()
-            // code = process1.waitFor()
-            // assertEquals(code, 0)
-        }
-    }
-
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String hms_port = context.config.otherConfigs.get("hms_port")
         String catalog_name = "test_catalog_hive_parquet"
-        set_be_config.call()
 
         sql """admin set frontend config ("enable_multi_catalog" = "true")"""
         sql """drop catalog if exists ${catalog_name}"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to