This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b4c2cabb4 [feature](new-scan) support transactional insert in new 
scan framework (#13858)
7b4c2cabb4 is described below

commit 7b4c2cabb46f5dcfbec27251d64c15275d4fd6a1
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Thu Nov 3 08:36:07 2022 +0800

    [feature](new-scan) support transactional insert in new scan framework 
(#13858)
    
    
    Support running transactional insert operation with new scan framework. eg:
    
    admin set frontend config("enable_new_load_scan_node" = "true");
    begin;
    insert into tbl1 values(1,2);
    insert into tbl1 values(3,4);
    insert into tbl1 values(5,6);
    commit;
    Add some limitation to transactional insert
    
    Do not support non-literal value in insert stmt
    Fix some issue about array type:
    
    Forbid cast other non-array type to NESTED array type, it may cause BE 
crash.
    Add getStringValueForArray() method for Expr, to get valid string-formatted 
array type value.
    Add useLocalSessionState=true in regression-test jdbc url
    without this config, the jdbc driver will send some init cmd each time it 
connect to server, such as
    select @@session.tx_read_only.
    But when we use transactional insert, after begin command, Doris do not 
support any other type of
    stmt except for insert, commit or rollback.
    So adding this config to let the jdbc NOT send cmd when connecting.
---
 be/src/runtime/fragment_mgr.cpp                    | 40 ++++++------
 be/src/vec/exec/format/csv/csv_reader.cpp          |  8 ++-
 be/src/vec/exec/format/csv/csv_reader.h            |  1 +
 be/src/vec/exec/scan/vfile_scanner.cpp             |  3 +-
 be/src/vec/exprs/vcast_expr.cpp                    |  1 -
 be/src/vec/functions/simple_function_factory.h     |  2 +-
 .../org/apache/doris/analysis/ArrayLiteral.java    | 13 +++-
 .../org/apache/doris/analysis/BoolLiteral.java     |  5 ++
 .../java/org/apache/doris/analysis/CastExpr.java   |  6 ++
 .../org/apache/doris/analysis/DateLiteral.java     |  5 ++
 .../org/apache/doris/analysis/DecimalLiteral.java  |  5 ++
 .../main/java/org/apache/doris/analysis/Expr.java  | 29 +++++++--
 .../org/apache/doris/analysis/FloatLiteral.java    |  5 ++
 .../java/org/apache/doris/analysis/IntLiteral.java |  5 ++
 .../org/apache/doris/analysis/JsonLiteral.java     | 10 +++
 .../org/apache/doris/analysis/LargeIntLiteral.java |  5 ++
 .../org/apache/doris/analysis/LiteralExpr.java     |  8 ++-
 .../java/org/apache/doris/analysis/MaxLiteral.java | 10 +++
 .../org/apache/doris/analysis/NullLiteral.java     |  7 ++
 .../org/apache/doris/analysis/StringLiteral.java   |  5 ++
 .../org/apache/doris/catalog/PrimitiveType.java    |  1 +
 .../main/java/org/apache/doris/catalog/Type.java   | 17 +++--
 .../apache/doris/qe/InsertStreamTxnExecutor.java   | 13 +++-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 15 +++--
 .../apache/doris/analysis/ArrayLiteralTest.java    | 65 +++++++++++++++++++
 .../apache/doris/analysis/InsertArrayStmtTest.java | 46 ++++++++++++-
 regression-test/conf/regression-conf.groovy        |  5 +-
 regression-test/data/insert_p0/txn_insert.out      | 29 +++++++++
 regression-test/suites/insert_p0/txn_insert.groovy | 75 ++++++++++++++++++++++
 29 files changed, 389 insertions(+), 50 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7296ae4628..46b8aa44ba 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -517,32 +517,32 @@ void 
FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
 
 Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
     if (params.txn_conf.need_txn) {
-        StreamLoadContext* stream_load_cxt = new StreamLoadContext(_exec_env);
-        stream_load_cxt->db = params.txn_conf.db;
-        stream_load_cxt->db_id = params.txn_conf.db_id;
-        stream_load_cxt->table = params.txn_conf.tbl;
-        stream_load_cxt->txn_id = params.txn_conf.txn_id;
-        stream_load_cxt->id = UniqueId(params.params.query_id);
-        stream_load_cxt->put_result.params = params;
-        stream_load_cxt->use_streaming = true;
-        stream_load_cxt->load_type = TLoadType::MANUL_LOAD;
-        stream_load_cxt->load_src_type = TLoadSourceType::RAW;
-        stream_load_cxt->label = params.import_label;
-        stream_load_cxt->format = TFileFormatType::FORMAT_CSV_PLAIN;
-        stream_load_cxt->timeout_second = 3600;
-        stream_load_cxt->auth.auth_code_uuid = params.txn_conf.auth_code_uuid;
-        stream_load_cxt->need_commit_self = true;
-        stream_load_cxt->need_rollback = true;
+        StreamLoadContext* stream_load_ctx = new StreamLoadContext(_exec_env);
+        stream_load_ctx->db = params.txn_conf.db;
+        stream_load_ctx->db_id = params.txn_conf.db_id;
+        stream_load_ctx->table = params.txn_conf.tbl;
+        stream_load_ctx->txn_id = params.txn_conf.txn_id;
+        stream_load_ctx->id = UniqueId(params.params.query_id);
+        stream_load_ctx->put_result.params = params;
+        stream_load_ctx->use_streaming = true;
+        stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
+        stream_load_ctx->load_src_type = TLoadSourceType::RAW;
+        stream_load_ctx->label = params.import_label;
+        stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
+        stream_load_ctx->timeout_second = 3600;
+        stream_load_ctx->auth.auth_code_uuid = params.txn_conf.auth_code_uuid;
+        stream_load_ctx->need_commit_self = true;
+        stream_load_ctx->need_rollback = true;
         // total_length == -1 means read one message from pipe in once time, 
don't care the length.
         auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* 
max_buffered_bytes */,
                                                      64 * 1024 /* 
min_chunk_size */,
                                                      -1 /* total_length */, 
true /* use_proto */);
-        stream_load_cxt->body_sink = pipe;
-        stream_load_cxt->max_filter_ratio = params.txn_conf.max_filter_ratio;
+        stream_load_ctx->body_sink = pipe;
+        stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
 
-        RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_cxt->id, 
pipe));
+        RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_ctx->id, 
pipe));
 
-        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt));
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
         set_pipe(params.params.fragment_instance_id, pipe);
         return Status::OK();
     } else {
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 20c8d71a96..b7661e4109 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -51,6 +51,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* 
profile, ScannerCounte
           _decompressor(nullptr),
           _skip_lines(0) {
     _file_format_type = _params.format_type;
+    _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
     _file_compress_type = _params.compress_type;
     _size = _range.size;
 
@@ -119,7 +120,9 @@ Status CsvReader::init_reader(bool is_load) {
     case TFileFormatType::FORMAT_CSV_DEFLATE:
         _line_reader.reset(new PlainTextLineReader(_profile, real_reader, 
_decompressor.get(),
                                                    _size, _line_delimiter, 
_line_delimiter_length));
-
+        break;
+    case TFileFormatType::FORMAT_PROTO:
+        _line_reader.reset(new PlainBinaryLineReader(real_reader));
         break;
     default:
         return Status::InternalError(
@@ -209,6 +212,7 @@ Status CsvReader::_create_decompressor() {
         }
     } else {
         switch (_file_format_type) {
+        case TFileFormatType::FORMAT_PROTO:
         case TFileFormatType::FORMAT_CSV_PLAIN:
             compress_type = CompressType::UNCOMPRESSED;
             break;
@@ -265,7 +269,7 @@ Status CsvReader::_fill_dest_columns(const Slice& line, 
Block* block, size_t* ro
 }
 
 Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
-    if (!validate_utf8(line.data, line.size)) {
+    if (!_is_proto_format && !validate_utf8(line.data, line.size)) {
         if (!_is_load) {
             return Status::InternalError("Only support csv data in utf8 
codec");
         } else {
diff --git a/be/src/vec/exec/format/csv/csv_reader.h 
b/be/src/vec/exec/format/csv/csv_reader.h
index d1ab1ebb26..cbb1b2c882 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -76,6 +76,7 @@ private:
     std::unique_ptr<Decompressor> _decompressor;
 
     TFileFormatType::type _file_format_type;
+    bool _is_proto_format;
     TFileCompressType::type _file_compress_type;
     int64_t _size;
     // When we fetch range start from 0, header_type="csv_with_names" skip 
first line
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 9df3722b95..2ee83b014a 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -489,7 +489,8 @@ Status VFileScanner::_get_next_reader() {
         case TFileFormatType::FORMAT_CSV_BZ2:
         case TFileFormatType::FORMAT_CSV_LZ4FRAME:
         case TFileFormatType::FORMAT_CSV_LZOP:
-        case TFileFormatType::FORMAT_CSV_DEFLATE: {
+        case TFileFormatType::FORMAT_CSV_DEFLATE:
+        case TFileFormatType::FORMAT_PROTO: {
             _cur_reader.reset(
                     new CsvReader(_state, _profile, &_counter, _params, range, 
_file_slot_descs));
             init_status = 
((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp
index 25b14ba3a9..68f3b05cc1 100644
--- a/be/src/vec/exprs/vcast_expr.cpp
+++ b/be/src/vec/exprs/vcast_expr.cpp
@@ -46,7 +46,6 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state, 
const doris::RowDes
     argument_template.reserve(2);
     argument_template.emplace_back(std::move(child_column), 
child->data_type(), child_name);
     argument_template.emplace_back(_cast_param, _cast_param_data_type, 
_target_data_type_name);
-
     _function = SimpleFunctionFactory::instance().get_function(function_name, 
argument_template,
                                                                _data_type);
 
diff --git a/be/src/vec/functions/simple_function_factory.h 
b/be/src/vec/functions/simple_function_factory.h
index cc7fca5008..3ffe5107d6 100644
--- a/be/src/vec/functions/simple_function_factory.h
+++ b/be/src/vec/functions/simple_function_factory.h
@@ -148,7 +148,7 @@ public:
             return iter->second()->build(arguments, return_type);
         }
 
-        LOG(WARNING) << fmt::format("Function signature {} is not founded", 
key_str);
+        LOG(WARNING) << fmt::format("Function signature {} is not found", 
key_str);
         return nullptr;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ArrayLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ArrayLiteral.java
index 989488610d..c0331c0318 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ArrayLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ArrayLiteral.java
@@ -61,7 +61,7 @@ public class ArrayLiteral extends LiteralExpr {
 
         children = new ArrayList<>();
         for (LiteralExpr expr : exprs) {
-            if (expr.getType() == itemType) {
+            if (expr.getType().equals(itemType)) {
                 children.add(expr);
             } else {
                 children.add(expr.castTo(itemType));
@@ -102,11 +102,17 @@ public class ArrayLiteral extends LiteralExpr {
     @Override
     public String getStringValue() {
         List<String> list = new ArrayList<>(children.size());
-        children.forEach(v -> list.add(((LiteralExpr) v).getStringValue()));
-
+        children.forEach(v -> list.add(v.getStringValue()));
         return "ARRAY[" + StringUtils.join(list, ", ") + "]";
     }
 
+    @Override
+    public String getStringValueForArray() {
+        List<String> list = new ArrayList<>(children.size());
+        children.forEach(v -> list.add(v.getStringValueForArray()));
+        return "[" + StringUtils.join(list, ", ") + "]";
+    }
+
     @Override
     protected void toThrift(TExprNode msg) {
         msg.node_type = TExprNodeType.ARRAY_LITERAL;
@@ -164,3 +170,4 @@ public class ArrayLiteral extends LiteralExpr {
         }
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/BoolLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BoolLiteral.java
index e434668286..c37a620235 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BoolLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BoolLiteral.java
@@ -105,6 +105,11 @@ public class BoolLiteral extends LiteralExpr {
         return value ? "1" : "0";
     }
 
+    @Override
+    public String getStringValueForArray() {
+        return "\"" + getStringValue() + "\"";
+    }
+
     @Override
     public ByteBuffer getHashValue(PrimitiveType type) {
         byte v = (byte) (value ? 1 : 0);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
index 87bfff145a..f72c3cce8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
@@ -573,4 +573,10 @@ public class CastExpr extends Expr {
                     "doris::CastFunctions::cast_to_array_val", null, null, 
true);
         }
     }
+
+    @Override
+    public String getStringValueForArray() {
+        return children.get(0).getStringValueForArray();
+    }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DateLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DateLiteral.java
index 9cd48b53ae..83f4473114 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DateLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DateLiteral.java
@@ -557,6 +557,11 @@ public class DateLiteral extends LiteralExpr {
         }
     }
 
+    @Override
+    public String getStringValueForArray() {
+        return "\"" + getStringValue() + "\"";
+    }
+
     public void roundCeiling(int newScale) {
         Preconditions.checkArgument(type.isDatetimeV2());
         long remain = Double.valueOf(microsecond % (Math.pow(10, 6 - 
newScale))).longValue();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DecimalLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DecimalLiteral.java
index 53ca2ba9ca..1969c4bbed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DecimalLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DecimalLiteral.java
@@ -220,6 +220,11 @@ public class DecimalLiteral extends LiteralExpr {
         return value.toString();
     }
 
+    @Override
+    public String getStringValueForArray() {
+        return "\"" + getStringValue() + "\"";
+    }
+
     @Override
     public long getLongValue() {
         return value.longValue();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
index 7753472864..fa60ac7e7d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -1373,8 +1373,8 @@ public abstract class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
         // "cast %s to %s", this.type, targetType);
         // TODO(zc): use implicit cast
         if (!Type.canCastTo(this.type, targetType)) {
-            throw new AnalysisException("type not match, originType=" + 
this.type
-                    + ", targeType=" + targetType);
+            throw new AnalysisException("can not cast from origin type " + 
this.type
+                    + " to target type=" + targetType);
 
         }
         return uncheckedCastTo(targetType);
@@ -1879,12 +1879,18 @@ public abstract class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
     }
 
     public String getStringValue() {
-        if (this instanceof LiteralExpr) {
-            return ((LiteralExpr) this).getStringValue();
-        }
         return "";
     }
 
+    // A special method only for array literal, all primitive type in array
+    // will be wrapped by double quote. eg:
+    // ["1", "2", "3"]
+    // ["a", "b", "c"]
+    // [["1", "2", "3"], ["1"], ["3"]]
+    public String getStringValueForArray() {
+        return null;
+    }
+
     public static Expr getFirstBoundChild(Expr expr, List<TupleId> tids) {
         for (Expr child : expr.getChildren()) {
             if (child.isBoundByTupleIds(tids)) {
@@ -2016,4 +2022,17 @@ public abstract class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
             child.materializeSrcExpr();
         }
     }
+
+    // This is only for transactional insert operation,
+    // to check it the given value in insert stmt is LiteralExpr.
+    // And if we write "1" to a boolean column, there will be a cast(1 as 
boolean) expr,
+    // which is also accepted.
+    public boolean isLiteralOrCastExpr() {
+        if (this instanceof CastExpr) {
+            return children.get(0) instanceof LiteralExpr;
+        } else {
+            return this instanceof LiteralExpr;
+        }
+    }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/FloatLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/FloatLiteral.java
index 75d5961d43..9a382c7cf2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FloatLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FloatLiteral.java
@@ -136,6 +136,11 @@ public class FloatLiteral extends LiteralExpr {
         return Double.toString(value);
     }
 
+    @Override
+    public String getStringValueForArray() {
+        return "\"" + getStringValue() + "\"";
+    }
+
     public static Type getDefaultTimeType(Type type) throws AnalysisException {
         switch (type.getPrimitiveType()) {
             case TIME:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
index 6a66f9d2c4..248a08e5e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
@@ -266,6 +266,11 @@ public class IntLiteral extends LiteralExpr {
         return Long.toString(value);
     }
 
+    @Override
+    public String getStringValueForArray() {
+        return "\"" + getStringValue() + "\"";
+    }
+
     @Override
     public long getLongValue() {
         return value;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/JsonLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/JsonLiteral.java
index 459a126bad..d573d07985 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/JsonLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/JsonLiteral.java
@@ -96,6 +96,16 @@ public class JsonLiteral extends LiteralExpr {
         msg.json_literal = new TJsonLiteral(getUnescapedValue());
     }
 
+    @Override
+    public String getStringValue() {
+        return null;
+    }
+
+    @Override
+    public String getStringValueForArray() {
+        return null;
+    }
+
     public String getUnescapedValue() {
         // Unescape string exactly like Hive does. Hive's method assumes
         // quotes so we add them here to reuse Hive's code.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java
index b570b4e0b3..7fb5518a40 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java
@@ -190,6 +190,11 @@ public class LargeIntLiteral extends LiteralExpr {
         return value.toString();
     }
 
+    @Override
+    public String getStringValueForArray() {
+        return "\"" + getStringValue() + "\"";
+    }
+
     @Override
     public long getLongValue() {
         return value.longValue();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java
index 831773b539..0528e33eed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java
@@ -187,9 +187,11 @@ public abstract class LiteralExpr extends Expr implements 
Comparable<LiteralExpr
     // literal values to the metastore rather than to Palo backends. This is 
similar to
     // the toSql() method, but does not perform any formatting of the string 
values. Neither
     // method unescapes string values.
-    public String getStringValue() {
-        return null;
-    }
+    @Override
+    public abstract String getStringValue();
+
+    @Override
+    public abstract String getStringValueForArray();
 
     public long getLongValue() {
         return 0;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/MaxLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/MaxLiteral.java
index 783ad88823..b0d54eb503 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/MaxLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MaxLiteral.java
@@ -66,6 +66,16 @@ public final class MaxLiteral extends LiteralExpr {
     public void write(DataOutput out) throws IOException {
     }
 
+    @Override
+    public String getStringValue() {
+        return null;
+    }
+
+    @Override
+    public String getStringValueForArray() {
+        return null;
+    }
+
     public void readFields(DataInput in) throws IOException {
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NullLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NullLiteral.java
index db2413fd8e..79e1b247b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NullLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NullLiteral.java
@@ -101,6 +101,13 @@ public class NullLiteral extends LiteralExpr {
         return "NULL";
     }
 
+    // the null value inside an array is represented as "null", for exampe:
+    // [null, null]. Not same as other primitive type to represent as \N.
+    @Override
+    public String getStringValueForArray() {
+        return "null";
+    }
+
     @Override
     public long getLongValue() {
         return 0;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java
index 9761a43725..89e4de09f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java
@@ -151,6 +151,11 @@ public class StringLiteral extends LiteralExpr {
         return value;
     }
 
+    @Override
+    public String getStringValueForArray() {
+        return "\"" + getStringValue() + "\"";
+    }
+
     @Override
     public long getLongValue() {
         return Long.valueOf(value);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
index 89e0a2ee24..cd9e6d6881 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
@@ -1203,3 +1203,4 @@ public enum PrimitiveType {
         }
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
index be8217e42b..74cab46808 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
@@ -483,13 +483,17 @@ public abstract class Type {
         return false;
     }
 
-    public static boolean canCastTo(Type t1, Type t2) {
-        if (t1.isScalarType() && t2.isScalarType()) {
-            return ScalarType.canCastTo((ScalarType) t1, (ScalarType) t2);
-        } else if (t1.isArrayType() && t2.isArrayType()) {
-            return ArrayType.canCastTo((ArrayType) t1, (ArrayType) t2);
+    public static boolean canCastTo(Type sourceType, Type targetType) {
+        if (sourceType.isScalarType() && targetType.isScalarType()) {
+            return ScalarType.canCastTo((ScalarType) sourceType, (ScalarType) 
targetType);
+        } else if (sourceType.isArrayType() && targetType.isArrayType()) {
+            return ArrayType.canCastTo((ArrayType) sourceType, (ArrayType) 
targetType);
+        } else if (targetType.isArrayType() && !((ArrayType) 
targetType).getItemType().isScalarType()
+                && !sourceType.isNull()) {
+            // TODO: current not support cast any non-array type(except for 
null) to nested array type.
+            return false;
         }
-        return t1.isNull() || t1.getPrimitiveType().isCharFamily();
+        return sourceType.isNull() || 
sourceType.getPrimitiveType().isCharFamily();
     }
 
     /**
@@ -1718,3 +1722,4 @@ public abstract class Type {
         return false;
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index e8e822e715..033d013ecf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.proto.InternalService;
@@ -31,6 +32,7 @@ import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.TBrokerRangeDesc;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TExecPlanFragmentParamsList;
+import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TScanRangeParams;
@@ -75,8 +77,15 @@ public class InsertStreamTxnExecutor {
         tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
         for (Map.Entry<Integer, List<TScanRangeParams>> entry : 
tRequest.params.per_node_scan_ranges.entrySet()) {
             for (TScanRangeParams scanRangeParams : entry.getValue()) {
-                for (TBrokerRangeDesc desc : 
scanRangeParams.scan_range.broker_scan_range.ranges) {
-                    desc.setFormatType(TFileFormatType.FORMAT_PROTO);
+                if (Config.enable_new_load_scan_node && 
Config.enable_vectorized_load) {
+                    
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
+                            TFileFormatType.FORMAT_PROTO);
+                    
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
+                            TFileCompressType.PLAIN);
+                } else {
+                    for (TBrokerRangeDesc desc : 
scanRangeParams.scan_range.broker_scan_range.ranges) {
+                        desc.setFormatType(TFileFormatType.FORMAT_PROTO);
+                    }
                 }
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 52764e84bb..d60b7fb97d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -220,14 +220,20 @@ public class StmtExecutor implements ProfileWriter {
         this.context.setStatementContext(statementContext);
     }
 
-    public static InternalService.PDataRow getRowStringValue(List<Expr> cols) {
-        if (cols.size() == 0) {
+    public static InternalService.PDataRow getRowStringValue(List<Expr> cols) 
throws UserException {
+        if (cols.isEmpty()) {
             return null;
         }
         InternalService.PDataRow.Builder row = 
InternalService.PDataRow.newBuilder();
         for (Expr expr : cols) {
+            if (!expr.isLiteralOrCastExpr()) {
+                throw new UserException(
+                        "do not support non-literal expr in transactional 
insert operation: " + expr.toSql());
+            }
             if (expr instanceof NullLiteral) {
                 row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
+            } else if (expr instanceof ArrayLiteral) {
+                row.addColBuilder().setValue(expr.getStringValueForArray());
             } else {
                 row.addColBuilder().setValue(expr.getStringValue());
             }
@@ -540,8 +546,8 @@ public class StmtExecutor implements ProfileWriter {
                         queryType = "Insert";
                     }
                 } catch (Throwable t) {
-                    LOG.warn("handle insert stmt fail", t);
-                    // the transaction of this insert may already begun, we 
will abort it at outer finally block.
+                    LOG.warn("handle insert stmt fail: {}", t.getMessage());
+                    // the transaction of this insert may already begin, we 
will abort it at outer finally block.
                     throw t;
                 }
             } else if (parsedStmt instanceof DdlStmt) {
@@ -1777,3 +1783,4 @@ public class StmtExecutor implements ProfileWriter {
         return parsedStmt;
     }
 }
+
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/ArrayLiteralTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/ArrayLiteralTest.java
new file mode 100644
index 0000000000..73e4f8f843
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ArrayLiteralTest.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+public class ArrayLiteralTest  {
+    @Test
+    public void testGetStringValueForArray() throws AnalysisException {
+        IntLiteral intLiteral1 = new IntLiteral(1);
+        FloatLiteral floatLiteral = new FloatLiteral("2.15");
+        BoolLiteral boolLiteral = new BoolLiteral(true);
+        StringLiteral stringLiteral = new StringLiteral("shortstring");
+        LargeIntLiteral largeIntLiteral = new 
LargeIntLiteral("1000000000000000000000");
+        NullLiteral nullLiteral = new NullLiteral();
+        DateLiteral dateLiteral = new DateLiteral("2022-10-10", Type.DATE);
+        DateLiteral datetimeLiteral = new DateLiteral("2022-10-10 12:10:10", 
Type.DATETIME);
+        ArrayLiteral arrayLiteral1 = new ArrayLiteral(intLiteral1, 
floatLiteral);
+        Assert.assertEquals("[\"1.0\", \"2.15\"]", 
arrayLiteral1.getStringValueForArray());
+
+        ArrayLiteral arrayLiteral2 = new ArrayLiteral(boolLiteral, 
boolLiteral);
+        Assert.assertEquals("[\"1\", \"1\"]", 
arrayLiteral2.getStringValueForArray());
+
+        ArrayLiteral arrayLiteral3 = new ArrayLiteral(stringLiteral, 
stringLiteral);
+        Assert.assertEquals("[\"shortstring\", \"shortstring\"]", 
arrayLiteral3.getStringValueForArray());
+
+        ArrayLiteral arrayLiteral4 = new ArrayLiteral(largeIntLiteral, 
largeIntLiteral);
+        Assert.assertEquals("[\"1000000000000000000000\", 
\"1000000000000000000000\"]", arrayLiteral4.getStringValueForArray());
+
+        ArrayLiteral arrayLiteral5 = new ArrayLiteral(nullLiteral, 
nullLiteral);
+        Assert.assertEquals("[null, null]", 
arrayLiteral5.getStringValueForArray());
+
+        ArrayLiteral arrayLiteral6 = new ArrayLiteral(dateLiteral, 
dateLiteral);
+        Assert.assertEquals("[\"2022-10-10\", \"2022-10-10\"]", 
arrayLiteral6.getStringValueForArray());
+
+        ArrayLiteral arrayLiteral7 = new ArrayLiteral(datetimeLiteral, 
datetimeLiteral);
+        Assert.assertEquals("[\"2022-10-10 12:10:10\", \"2022-10-10 
12:10:10\"]", arrayLiteral7.getStringValueForArray());
+
+        ArrayLiteral arrayLiteral8 = new ArrayLiteral(arrayLiteral7, 
arrayLiteral7);
+        Assert.assertEquals("[[\"2022-10-10 12:10:10\", \"2022-10-10 
12:10:10\"], [\"2022-10-10 12:10:10\", \"2022-10-10 12:10:10\"]]",
+                arrayLiteral8.getStringValueForArray());
+
+        ArrayLiteral arrayLiteral9 = new ArrayLiteral();
+        Assert.assertEquals("[]", arrayLiteral9.getStringValueForArray());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java
index 5c69308331..c82e1b0844 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java
@@ -21,9 +21,13 @@ import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.utframe.UtFrameUtils;
 
@@ -104,7 +108,47 @@ public class InsertArrayStmtTest {
         Assert.assertSame(PrimitiveType.INT, ((ArrayType) 
arrayLiteral.getType()).getItemType().getPrimitiveType());
 
         connectContext.setQueryId(new TUniqueId(3, 0));
-        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "type 
not match",
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "can not 
cast from origin type",
                 () -> parseAndAnalyze("insert into test.table1 values (1, [[1, 
2], [3, 4]]);"));
     }
+
+    @Test
+    public void testTransactionalInsert() throws Exception {
+        Config.enable_new_load_scan_node = true;
+        ExceptionChecker.expectThrowsNoException(
+                () -> createTable("CREATE TABLE test.`txn_insert_tbl` (\n"
+                        + "  `k1` int(11) NULL,\n"
+                        + "  `k2` double NULL,\n"
+                        + "  `k3` varchar(100) NULL,\n"
+                        + "  `k4` array<int(11)> NULL,\n"
+                        + "  `k5` array<boolean> NULL\n"
+                        + ") ENGINE=OLAP\n"
+                        + "DUPLICATE KEY(`k1`)\n"
+                        + "COMMENT 'OLAP'\n"
+                        + "DISTRIBUTED BY HASH(`k1`) BUCKETS 1\n"
+                        + "PROPERTIES (\n"
+                        + "\"replication_allocation\" = 
\"tag.location.default: 1\",\n"
+                        + "\"in_memory\" = \"false\",\n"
+                        + "\"storage_format\" = \"V2\",\n"
+                        + "\"disable_auto_compaction\" = \"false\"\n"
+                        + ");"));
+
+        SqlParser parser = new SqlParser(new SqlScanner(
+                new StringReader("begin"), 
connectContext.getSessionVariable().getSqlMode()
+        ));
+        TransactionBeginStmt beginStmt = (TransactionBeginStmt) 
SqlParserUtils.getFirstStmt(parser);
+        StmtExecutor stmtExecutor = new StmtExecutor(connectContext, 
beginStmt);
+        stmtExecutor.execute();
+
+        parser = new SqlParser(new SqlScanner(
+                new StringReader("insert into test.txn_insert_tbl values(2, 
3.3, \"xyz\", [1], [1, 0]);"),
+                connectContext.getSessionVariable().getSqlMode()
+        ));
+        InsertStmt insertStmt = (InsertStmt) 
SqlParserUtils.getFirstStmt(parser);
+        stmtExecutor = new StmtExecutor(connectContext, insertStmt);
+        stmtExecutor.execute();
+        QueryState state = connectContext.getState();
+        Assert.assertEquals(MysqlStateType.OK, state.getStateType());
+    }
 }
+
diff --git a/regression-test/conf/regression-conf.groovy 
b/regression-test/conf/regression-conf.groovy
index 6791f7cacd..49d20902c1 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -20,7 +20,10 @@
 // **Note**: default db will be create if not exist
 defaultDb = "regression_test"
 
-jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?"
+// add useLocalSessionState so that the jdbc will not send
+// init cmd like: select @@session.tx_read_only
+// at each time we connect.
+jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true"
 jdbcUser = "root"
 jdbcPassword = ""
 
diff --git a/regression-test/data/insert_p0/txn_insert.out 
b/regression-test/data/insert_p0/txn_insert.out
new file mode 100644
index 0000000000..5d66d45a39
--- /dev/null
+++ b/regression-test/data/insert_p0/txn_insert.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select1 --
+\N     \N      \N      [NULL]  [NULL, 0]
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select2 --
+\N     \N      \N      [NULL]  [NULL, 0]
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select3 --
+\N     \N      \N      [NULL]  [NULL, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select4 --
+\N     \N      \N      [NULL]  [NULL, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
diff --git a/regression-test/suites/insert_p0/txn_insert.groovy 
b/regression-test/suites/insert_p0/txn_insert.groovy
new file mode 100644
index 0000000000..64477ee8c3
--- /dev/null
+++ b/regression-test/suites/insert_p0/txn_insert.groovy
@@ -0,0 +1,75 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
+// and modified by Doris.
+
+suite("txn_insert") {
+    def table = "txn_insert_tbl"
+    sql """ DROP TABLE IF EXISTS $table """
+    sql """
+        create table $table (
+            k1 int, 
+            k2 double,
+            k3 varchar(100),
+            k4 array<int>,
+            k5 array<boolean>
+        ) distributed by hash(k1) buckets 1
+        properties("replication_num" = "1"); 
+    """
+
+    // begin and commit
+    sql """begin"""
+    sql """insert into $table values(1, 2.2, "abc", [], [])"""
+    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+    sql """insert into $table values(null, null, null, [null], [null, 0])"""
+    sql "commit"
+    sql "sync"
+    order_qt_select1 """select * from $table"""
+
+    // begin and rollback
+    sql "begin"
+    sql """insert into $table values(1, 2.2, "abc", [], [])"""
+    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+    sql "rollback"
+    sql "sync"
+    order_qt_select2 """select * from $table"""
+
+    // begin 2 times and commit
+    sql "begin"
+    sql """insert into $table values(1, 2.2, "abc", [], [])"""
+    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+    sql "begin"
+    sql """insert into $table values(1, 2.2, "abc", [], [])"""
+    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+    sql "commit"
+    sql "sync"
+    order_qt_select3 """select * from $table"""
+
+    // begin 2 times and rollback
+    sql "begin"
+    sql """insert into $table values(1, 2.2, "abc", [], [])"""
+    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+    sql "begin"
+    sql """insert into $table values(1, 2.2, "abc", [], [])"""
+    sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])"""
+    sql "rollback"
+    sql "sync"
+    order_qt_select4 """select * from $table"""
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to