This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 45b2dbab6af [improve](group commit) Group commit support max filter 
ratio when rows is less than value in config (#28139)
45b2dbab6af is described below

commit 45b2dbab6afa093f3402ac919929f97ff28f5c9a
Author: meiyi <myime...@gmail.com>
AuthorDate: Tue Dec 12 16:33:36 2023 +0800

    [improve](group commit) Group commit support max filter ratio when rows is 
less than value in config (#28139)
---
 be/src/common/config.cpp                           |   3 +-
 be/src/common/config.h                             |   3 +-
 be/src/runtime/group_commit_mgr.cpp                |   6 +-
 be/src/runtime/group_commit_mgr.h                  |   2 -
 .../runtime/stream_load/stream_load_executor.cpp   |   9 +
 be/src/vec/sink/group_commit_block_sink.cpp        |  61 +++-
 be/src/vec/sink/group_commit_block_sink.h          |   5 +
 .../apache/doris/analysis/NativeInsertStmt.java    |   2 +-
 .../apache/doris/planner/GroupCommitBlockSink.java |   5 +-
 .../apache/doris/planner/GroupCommitPlanner.java   |   5 +-
 .../apache/doris/planner/StreamLoadPlanner.java    |   6 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   5 +-
 gensrc/thrift/DataSinks.thrift                     |   1 +
 .../insert_group_commit_into_max_filter_ratio.out  |  34 +++
 .../data/insert_p0/test_group_commit_10.csv        |   4 +
 .../data/insert_p0/test_group_commit_11.csv.gz     | Bin 0 -> 35223 bytes
 .../http_stream/test_group_commit_http_stream.out  |   3 -
 ...nsert_group_commit_into_max_filter_ratio.groovy | 339 +++++++++++++++++++++
 .../insert_group_commit_with_prepare_stmt.groovy   |   2 +
 .../test_group_commit_http_stream.groovy           |  14 +-
 .../test_group_commit_stream_load.groovy           |   2 +-
 21 files changed, 475 insertions(+), 36 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 549f6a0db0d..c97959f2e4a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1100,10 +1100,11 @@ DEFINE_Int16(bitmap_serialize_version, "1");
 DEFINE_String(group_commit_replay_wal_dir, "./wal");
 DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
 DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
-DEFINE_Bool(wait_internal_group_commit_finish, "false");
 
 // the count of thread to group commit insert
 DEFINE_Int32(group_commit_insert_threads, "10");
+DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000");
+DEFINE_Bool(wait_internal_group_commit_finish, "false");
 
 DEFINE_mInt32(scan_thread_nice_value, "0");
 DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index bad4724ac91..e3dbe3234c4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1173,10 +1173,11 @@ DECLARE_Int16(bitmap_serialize_version);
 DECLARE_String(group_commit_replay_wal_dir);
 DECLARE_Int32(group_commit_replay_wal_retry_num);
 DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
-DECLARE_Bool(wait_internal_group_commit_finish);
 
 // This config can be set to limit thread number in group commit insert thread 
pool.
 DECLARE_mInt32(group_commit_insert_threads);
+DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio);
+DECLARE_Bool(wait_internal_group_commit_finish);
 
 // The configuration item is used to lower the priority of the scanner thread,
 // typically employed to ensure CPU scheduling for write operations.
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 224d446b661..b97d5de8a54 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -133,8 +133,7 @@ void LoadBlockQueue::cancel(const Status& st) {
 
 Status GroupCommitTable::get_first_block_load_queue(
         int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
-        std::shared_ptr<vectorized::Block> block, 
std::shared_ptr<LoadBlockQueue>& load_block_queue,
-        int be_exe_version) {
+        std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) 
{
     DCHECK(table_id == _table_id);
     {
         std::unique_lock l(_lock);
@@ -425,7 +424,6 @@ void GroupCommitMgr::stop() {
 Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t 
table_id,
                                                   int64_t base_schema_version,
                                                   const UniqueId& load_id,
-                                                  
std::shared_ptr<vectorized::Block> block,
                                                   
std::shared_ptr<LoadBlockQueue>& load_block_queue,
                                                   int be_exe_version) {
     std::shared_ptr<GroupCommitTable> group_commit_table;
@@ -439,7 +437,7 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t 
db_id, int64_t table_i
         group_commit_table = _table_map[table_id];
     }
     return group_commit_table->get_first_block_load_queue(table_id, 
base_schema_version, load_id,
-                                                          block, 
load_block_queue, be_exe_version);
+                                                          load_block_queue, 
be_exe_version);
 }
 
 Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& 
instance_id,
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 90b0e7a040f..44826cd9465 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -100,7 +100,6 @@ public:
               _all_block_queues_bytes(all_block_queue_bytes) {};
     Status get_first_block_load_queue(int64_t table_id, int64_t 
base_schema_version,
                                       const UniqueId& load_id,
-                                      std::shared_ptr<vectorized::Block> block,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version);
     Status get_load_block_queue(const TUniqueId& instance_id,
@@ -142,7 +141,6 @@ public:
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
     Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t 
base_schema_version,
                                       const UniqueId& load_id,
-                                      std::shared_ptr<vectorized::Block> block,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version);
 
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 50bb2407564..1fc8eb81207 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -101,6 +101,15 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
                         ctx->number_loaded_rows);
             }
         } else {
+            if (ctx->group_commit && status->is<DATA_QUALITY_ERROR>()) {
+                ctx->number_total_rows = state->num_rows_load_total();
+                ctx->number_loaded_rows = state->num_rows_load_success();
+                ctx->number_filtered_rows = state->num_rows_load_filtered();
+                ctx->number_unselected_rows = 
state->num_rows_load_unselected();
+                if (ctx->number_filtered_rows > 0 && 
!state->get_error_log_file_path().empty()) {
+                    ctx->error_url = 
to_load_error_http_path(state->get_error_log_file_path());
+                }
+            }
             LOG(WARNING) << "fragment execute failed"
                          << ", query_id=" << 
UniqueId(ctx->put_result.params.params.query_id)
                          << ", err_msg=" << status->to_string() << ", " << 
ctx->brief();
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index 2d40f94a543..bb5c5c70d0c 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -49,6 +49,7 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
     _base_schema_version = table_sink.base_schema_version;
     _group_commit_mode = table_sink.group_commit_mode;
     _load_id = table_sink.load_id;
+    _max_filter_ratio = table_sink.max_filter_ratio;
     return Status::OK();
 }
 
@@ -84,18 +85,28 @@ Status GroupCommitBlockSink::open(RuntimeState* state) {
 }
 
 Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) {
-    if (_load_block_queue) {
-        _load_block_queue->remove_load_id(_load_id);
-    }
     RETURN_IF_ERROR(DataSink::close(state, close_status));
     RETURN_IF_ERROR(close_status);
-    // wait to wal
     int64_t total_rows = state->num_rows_load_total();
     int64_t loaded_rows = state->num_rows_load_total();
-    state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() 
+ total_rows -
-                                         loaded_rows);
     state->set_num_rows_load_total(loaded_rows + 
state->num_rows_load_unselected() +
                                    state->num_rows_load_filtered());
+    state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() 
+ total_rows -
+                                         loaded_rows);
+    if (!_is_block_appended) {
+        // if not meet the max_filter_ratio, we should return error status 
directly
+        int64_t num_selected_rows =
+                state->num_rows_load_total() - 
state->num_rows_load_unselected();
+        if (num_selected_rows > 0 &&
+            (double)state->num_rows_load_filtered() / num_selected_rows > 
_max_filter_ratio) {
+            return Status::DataQualityError("too many filtered rows");
+        }
+        RETURN_IF_ERROR(_add_blocks());
+    }
+    if (_load_block_queue) {
+        _load_block_queue->remove_load_id(_load_id);
+    }
+    // wait to wal
     auto st = Status::OK();
     if (_load_block_queue && 
(_load_block_queue->wait_internal_group_commit_finish ||
                               _group_commit_mode == 
TGroupCommitMode::SYNC_MODE)) {
@@ -148,6 +159,8 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
     if (block->rows() == 0) {
         return Status::OK();
     }
+    // the insert group commit tvf always accept nullable columns, so we 
should convert
+    // the non-nullable columns to nullable columns
     for (int i = 0; i < block->columns(); ++i) {
         if (block->get_by_position(i).type->is_nullable()) {
             continue;
@@ -166,22 +179,42 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* 
state,
     }
     std::shared_ptr<vectorized::Block> output_block = 
vectorized::Block::create_shared();
     output_block->swap(cur_mutable_block->to_block());
+    if (!_is_block_appended && state->num_rows_load_total() + 
state->num_rows_load_unselected() +
+                                               state->num_rows_load_filtered() 
<=
+                                       
config::group_commit_memory_rows_for_max_filter_ratio) {
+        _blocks.emplace_back(output_block);
+    } else {
+        if (!_is_block_appended) {
+            RETURN_IF_ERROR(_add_blocks());
+        }
+        RETURN_IF_ERROR(_load_block_queue->add_block(
+                output_block, _group_commit_mode != 
TGroupCommitMode::SYNC_MODE));
+    }
+    return Status::OK();
+}
+
+Status GroupCommitBlockSink::_add_blocks() {
+    DCHECK(_is_block_appended == false);
     TUniqueId load_id;
     load_id.__set_hi(_load_id.hi);
     load_id.__set_lo(_load_id.lo);
     if (_load_block_queue == nullptr) {
-        if (state->exec_env()->wal_mgr()->is_running()) {
-            
RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
-                    _db_id, _table_id, _base_schema_version, load_id, block, 
_load_block_queue,
-                    state->be_exec_version()));
-            state->set_import_label(_load_block_queue->label);
-            state->set_wal_id(_load_block_queue->txn_id);
+        if (_state->exec_env()->wal_mgr()->is_running()) {
+            
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
+                    _db_id, _table_id, _base_schema_version, load_id, 
_load_block_queue,
+                    _state->be_exec_version()));
+            _state->set_import_label(_load_block_queue->label);
+            _state->set_wal_id(_load_block_queue->txn_id);
         } else {
             return Status::InternalError("be is stopping");
         }
     }
-    RETURN_IF_ERROR(_load_block_queue->add_block(
-            output_block, _group_commit_mode != TGroupCommitMode::SYNC_MODE));
+    for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
+        RETURN_IF_ERROR(_load_block_queue->add_block(
+                *it, _group_commit_mode != TGroupCommitMode::SYNC_MODE));
+    }
+    _is_block_appended = true;
+    _blocks.clear();
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/group_commit_block_sink.h 
b/be/src/vec/sink/group_commit_block_sink.h
index 4be5a5514cb..2ae37be368a 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -47,6 +47,7 @@ public:
 
 private:
     Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> 
block);
+    Status _add_blocks();
 
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
 
@@ -65,6 +66,10 @@ private:
     TGroupCommitMode::type _group_commit_mode;
     UniqueId _load_id;
     std::shared_ptr<LoadBlockQueue> _load_block_queue;
+    // used to calculate if meet the max filter ratio
+    std::vector<std::shared_ptr<vectorized::Block>> _blocks;
+    bool _is_block_appended = false;
+    double _max_filter_ratio = 0.0;
 };
 
 } // namespace vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index dec20ab480e..f523e3e594f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -970,7 +970,7 @@ public class NativeInsertStmt extends InsertStmt {
             if (isGroupCommitStreamLoadSql) {
                 sink = new GroupCommitBlockSink((OlapTable) targetTable, 
olapTuple,
                         targetPartitionIds, 
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(),
-                        
ConnectContext.get().getSessionVariable().getGroupCommit());
+                        
ConnectContext.get().getSessionVariable().getGroupCommit(), 0);
             } else {
                 sink = new OlapTableSink((OlapTable) targetTable, olapTuple, 
targetPartitionIds,
                         
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
index 14ecda1a5b2..8030dcf4e5c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
@@ -29,11 +29,13 @@ import java.util.List;
 
 public class GroupCommitBlockSink extends OlapTableSink {
     private String groupCommit;
+    private double maxFilterRatio;
 
     public GroupCommitBlockSink(OlapTable dstTable, TupleDescriptor 
tupleDescriptor, List<Long> partitionIds,
-            boolean singleReplicaLoad, String groupCommit) {
+            boolean singleReplicaLoad, String groupCommit, double 
maxFilterRatio) {
         super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad);
         this.groupCommit = groupCommit;
+        this.maxFilterRatio = maxFilterRatio;
     }
 
     protected TDataSinkType getDataSinkType() {
@@ -45,6 +47,7 @@ public class GroupCommitBlockSink extends OlapTableSink {
         TGroupCommitMode groupCommitMode = parseGroupCommit(groupCommit);
         Preconditions.checkNotNull(groupCommitMode, "Group commit is: " + 
groupCommit);
         tDataSink.olap_table_sink.setGroupCommitMode(groupCommitMode);
+        tDataSink.olap_table_sink.setMaxFilterRatio(maxFilterRatio);
         return tDataSink;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index b3557bd5634..bc99d771d91 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -98,11 +98,12 @@ public class GroupCommitPlanner {
         }
         streamLoadPutRequest
                 .setDb(db.getFullName())
-                .setMaxFilterRatio(1)
+                
.setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict 
? 0 : 1)
                 .setTbl(table.getName())
                 
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
                 
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
-                .setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit);
+                .setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit)
+                
.setStrictMode(ConnectContext.get().getSessionVariable().enableInsertStrict);
         StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
         StreamLoadPlanner planner = new StreamLoadPlanner(db, table, 
streamLoadTask);
         // Will using load id as query id in fragment
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 65fe2163695..bf3261d3944 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -261,7 +261,8 @@ public class StreamLoadPlanner {
         OlapTableSink olapTableSink;
         if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) 
taskInfo).getGroupCommit() != null) {
             olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, 
partitionIds,
-                    Config.enable_single_replica_load, ((StreamLoadTask) 
taskInfo).getGroupCommit());
+                    Config.enable_single_replica_load, ((StreamLoadTask) 
taskInfo).getGroupCommit(),
+                    taskInfo.getMaxFilterRatio());
         } else {
             olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds, Config.enable_single_replica_load);
         }
@@ -481,7 +482,8 @@ public class StreamLoadPlanner {
         OlapTableSink olapTableSink;
         if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) 
taskInfo).getGroupCommit() != null) {
             olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, 
partitionIds,
-                    Config.enable_single_replica_load, ((StreamLoadTask) 
taskInfo).getGroupCommit());
+                    Config.enable_single_replica_load, ((StreamLoadTask) 
taskInfo).getGroupCommit(),
+                    taskInfo.getMaxFilterRatio());
         } else {
             olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds, Config.enable_single_replica_load);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 463b54771bb..0aabb8d7f34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -176,6 +176,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.ProtocolStringList;
 import lombok.Setter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -1891,7 +1892,9 @@ public class StmtExecutor {
                 List<InternalService.PDataRow> rows = 
groupCommitPlanner.getRows(nativeInsertStmt);
                 PGroupCommitInsertResponse response = 
groupCommitPlanner.executeGroupCommitInsert(context, rows);
                 TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
-                if (code == TStatusCode.DATA_QUALITY_ERROR) {
+                ProtocolStringList errorMsgsList = 
response.getStatus().getErrorMsgsList();
+                if (code == TStatusCode.DATA_QUALITY_ERROR && 
!errorMsgsList.isEmpty() && errorMsgsList.get(0)
+                        .contains("schema version not match")) {
                     LOG.info("group commit insert failed. stmt: {}, backend 
id: {}, status: {}, "
                                     + "schema version: {}, retry: {}", 
insertStmt.getOrigStmt().originStmt,
                             groupCommitPlanner.getBackend().getId(),
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 40c0e760e85..daf3bcc06a9 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -266,6 +266,7 @@ struct TOlapTableSink {
     // used by GroupCommitBlockSink
     21: optional i64 base_schema_version
     22: optional TGroupCommitMode group_commit_mode
+    23: optional double max_filter_ratio
 }
 
 struct TDataSink {
diff --git 
a/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out 
b/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out
new file mode 100644
index 00000000000..62743feeb6c
--- /dev/null
+++ 
b/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out
@@ -0,0 +1,34 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      a       10
+2      \N      -1
+3      a       10
+9      a       \N
+
+-- !sql --
+1      a       10
+2      \N      -1
+3      a       10
+6      a       \N
+7      a       \N
+9      a       \N
+
+-- !sql --
+1      a       21
+1      a       21
+2      b       22
+2      b       22
+3      c       23
+3      c       23
+4      d       \N
+
+-- !sql --
+1      a       21
+1      a       21
+2      b       22
+2      b       22
+3      c       23
+3      c       23
+4      d       \N
+4      d       \N
+
diff --git a/regression-test/data/insert_p0/test_group_commit_10.csv 
b/regression-test/data/insert_p0/test_group_commit_10.csv
new file mode 100644
index 00000000000..ef677f06936
--- /dev/null
+++ b/regression-test/data/insert_p0/test_group_commit_10.csv
@@ -0,0 +1,4 @@
+1,a,21
+2,b,22
+3,c,23
+4,d,a
\ No newline at end of file
diff --git a/regression-test/data/insert_p0/test_group_commit_11.csv.gz 
b/regression-test/data/insert_p0/test_group_commit_11.csv.gz
new file mode 100644
index 00000000000..59c57ffdbf0
Binary files /dev/null and 
b/regression-test/data/insert_p0/test_group_commit_11.csv.gz differ
diff --git 
a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out 
b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
index abe3210dd81..57c2525815a 100644
--- a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
+++ b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
@@ -19,9 +19,6 @@
 6      f       60
 7      e       70
 8      f       80
-10     a       10
-11     a       11
-12     a       \N
 
 -- !sql --
 2402288
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
 
b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
new file mode 100644
index 00000000000..02f3ca1e7a9
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
@@ -0,0 +1,339 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.mysql.cj.jdbc.StatementImpl
+
+suite("insert_group_commit_into_max_filter_ratio") {
+    def dbName = "regression_test_insert_p0"
+    def tableName = "insert_group_commit_into_max_filter_ratio"
+    def dbTableName = dbName + "." + tableName
+
+    def get_row_count = { expectedRowCount ->
+        def rowCount = sql "select count(*) from ${dbTableName}"
+        logger.info("rowCount: " + rowCount + ", expecedRowCount: " + 
expectedRowCount)
+        assertEquals(expectedRowCount, rowCount[0][0])
+    }
+
+    def get_row_count_with_retry = { expectedRowCount ->
+        def retry = 0
+        while (retry < 30) {
+            sleep(2000)
+            def rowCount = sql "select count(*) from ${dbTableName}"
+            logger.info("rowCount: " + rowCount + ", retry: " + retry)
+            if (rowCount[0][0] >= expectedRowCount) {
+                break
+            }
+            retry++
+        }
+    }
+
+    def group_commit_insert = { sql, expected_row_count ->
+        def stmt = prepareStatement """ ${sql}  """
+        def result = stmt.executeUpdate()
+        logger.info("insert result: " + result)
+        def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
+        logger.info("result server info: " + serverInfo)
+        if (result != expected_row_count) {
+            logger.warn("insert result: " + result + ", expected_row_count: " 
+ expected_row_count + ", sql: " + sql)
+        }
+        // assertEquals(result, expected_row_count)
+        assertTrue(serverInfo.contains("'status':'PREPARE'"))
+        assertTrue(serverInfo.contains("'label':'group_commit_"))
+    }
+
+    def off_mode_group_commit_insert = { sql, expected_row_count ->
+        def stmt = prepareStatement """ ${sql}  """
+        def result = stmt.executeUpdate()
+        logger.info("insert result: " + result)
+        def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
+        logger.info("result server info: " + serverInfo)
+        if (result != expected_row_count) {
+            logger.warn("insert result: " + result + ", expected_row_count: " 
+ expected_row_count + ", sql: " + sql)
+        }
+        // assertEquals(result, expected_row_count)
+        assertTrue(serverInfo.contains("'status':'VISIBLE'"))
+        assertFalse(serverInfo.contains("'label':'group_commit_"))
+    }
+
+    def fail_group_commit_insert = { sql, expected_row_count ->
+        def stmt = prepareStatement """ ${sql}  """
+        try {
+            def result = stmt.executeUpdate()
+            logger.info("insert result: " + result)
+            def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
+            logger.info("result server info: " + serverInfo)
+            if (result != expected_row_count) {
+                logger.warn("insert result: " + result + ", 
expected_row_count: " + expected_row_count + ", sql: " + sql)
+            }
+            // assertEquals(result, expected_row_count)
+            assertTrue(serverInfo.contains("'status':'ABORTED'"))
+            // assertFalse(serverInfo.contains("'label':'group_commit_"))
+        } catch (Exception e) {
+            logger.info("exception: " + e)
+        }
+    }
+
+    def check_stream_load_result = { exception, result, total_rows, 
loaded_rows, filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+
+    def check_stream_load_result_with_exception = { exception, result, 
total_rows, loaded_rows, filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("fail", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        // assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+        assertTrue(json.Message.contains("too many filtered rows"))
+    }
+
+    def check_off_mode_stream_load_result = { exception, result, total_rows, 
loaded_rows, filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertFalse(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+
+    // create table
+    sql """ drop table if exists ${tableName}; """
+    sql """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NOT NULL,
+            `type` varchar(1) NULL,
+            `score` int(11) NULL default "-1"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "group_commit_interval_ms" = "1000"
+        );
+    """
+
+    // insert
+    // legacy, nereids
+    // if enable strict mode
+    // 100 rows(success, fail), 10000 rows(success, fail), 15000 rows(success, 
fail)
+    // async mode, sync mode, off mode
+    for (item in ["legacy", "nereids"]) {
+        sql """ truncate table ${tableName} """
+        connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+            if (item == "nereids") {
+                sql """ set enable_nereids_dml = true; """
+                sql """ set enable_nereids_planner=true; """
+                // sql """ set enable_fallback_to_original_planner=false; """
+            } else {
+                sql """ set enable_nereids_dml = false; """
+            }
+
+            sql """ set group_commit = sync_mode; """
+            group_commit_insert """ insert into ${dbTableName} values (1, 'a', 
10); """, 1
+            sql """ set group_commit = async_mode; """
+            group_commit_insert """ insert into ${dbTableName}(id) select 2; 
""", 1
+            sql """ set group_commit = off_mode; """
+            off_mode_group_commit_insert """ insert into ${dbTableName} values 
(3, 'a', 10); """, 1
+            sql """ set group_commit = async_mode; """
+            fail_group_commit_insert """ insert into ${dbTableName} values (4, 
'abc', 10); """, 0
+            sql """ set enable_insert_strict = false; """
+            group_commit_insert """ insert into ${dbTableName} values (5, 
'abc', 10); """, 0
+
+            // The row 6 and 7 is different between legacy and nereids
+            try {
+                sql """ set group_commit = off_mode; """
+                sql """ set enable_insert_strict = true; """
+                sql """ insert into ${dbTableName} values (6, 'a', 'a'); """
+            } catch (Exception e) {
+                logger.info("exception: " + e)
+                assertTrue(e.toString().contains("Invalid number format"))
+            }
+
+            try {
+                sql """ set group_commit = off_mode; """
+                sql """ set enable_insert_strict = false; """
+                sql """ insert into ${dbTableName} values (7, 'a', 'a'); """
+            } catch (Exception e) {
+                logger.info("exception: " + e)
+                assertTrue(e.toString().contains("Invalid number format"))
+            }
+
+            // TODO should throw exception?
+            sql """ set group_commit = async_mode; """
+            sql """ set enable_insert_strict = true; """
+            fail_group_commit_insert """ insert into ${dbTableName} values (8, 
'a', 'a'); """, 0
+
+            sql """ set group_commit = async_mode; """
+            sql """ set enable_insert_strict = false; """
+            group_commit_insert """ insert into ${dbTableName} values (9, 'a', 
'a'); """, 0
+        }
+        get_row_count_with_retry(4 + item == "nereids" ? 2 : 0)
+        order_qt_sql """ select * from ${dbTableName} """
+    }
+    sql """ truncate table ${tableName} """
+
+    // 2. stream load(async or sync mode, strict mode, max_filter_ratio, 10000 
rows)
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        file "test_group_commit_10.csv"
+        unset 'label'
+        set 'group_commit', 'async_mode'
+
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            check_stream_load_result(exception, result, 4, 4, 0, 0)
+        }
+    }
+    get_row_count_with_retry(4)
+
+    // sync_mode, strict_mode = true, max_filter_ratio = 0
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        file "test_group_commit_10.csv"
+        unset 'label'
+        set 'group_commit', 'sync_mode'
+        set 'strict_mode', 'true'
+
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            check_stream_load_result_with_exception(exception, result, 4, 3, 
1, 0)
+        }
+    }
+    get_row_count(4)
+
+    // sync_mode, strict_mode = true, max_filter_ratio = 0.3
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        file "test_group_commit_10.csv"
+        unset 'label'
+        set 'group_commit', 'sync_mode'
+        set 'strict_mode', 'true'
+        set 'max_filter_ratio', '0.3'
+
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            check_stream_load_result(exception, result, 4, 3, 1, 0)
+        }
+    }
+    get_row_count(7)
+
+    order_qt_sql """ select * from ${tableName} """
+
+    // 10001 rows
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        file "test_group_commit_11.csv.gz"
+        unset 'label'
+        set 'compress_type', 'gz'
+        set 'group_commit', 'sync_mode'
+        set 'strict_mode', 'true'
+
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            check_stream_load_result(exception, result, 10001, 10000, 1, 0)
+        }
+    }
+    get_row_count(10007)
+    sql """ truncate table ${tableName} """
+
+    // 3. http stream(async or sync mode, strict mode, max_filter_ratio, 10000 
rows)
+    streamLoad {
+        set 'version', '1'
+        set 'sql', """
+                    insert into ${dbTableName} select * from http_stream
+                    ("format"="csv", "column_separator"=",")
+                """
+        set 'group_commit', 'sync_mode'
+        file "test_group_commit_10.csv"
+        unset 'label'
+
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            check_stream_load_result(exception, result, 4, 4, 0, 0)
+        }
+    }
+    get_row_count_with_retry(4)
+
+    // not use group commit
+    streamLoad {
+        set 'version', '1'
+        set 'sql', """
+                    insert into ${dbTableName} select * from http_stream
+                    ("format"="csv", "column_separator"=",")
+                """
+        file "test_group_commit_10.csv"
+
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            check_off_mode_stream_load_result(exception, result, 4, 4, 0, 0)
+        }
+    }
+    get_row_count(8)
+
+    order_qt_sql """ select * from ${tableName} """
+}
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
index 144cc30c095..72d52229816 100644
--- 
a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
+++ 
b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
@@ -100,6 +100,7 @@ suite("insert_group_commit_with_prepare_stmt") {
             """
 
             sql """ set group_commit = async_mode; """
+            sql """ set enable_insert_strict = false; """
 
             // 1. insert into
             def insert_stmt = prepareStatement """ INSERT INTO ${table} 
VALUES(?, ?, ?) """
@@ -159,6 +160,7 @@ suite("insert_group_commit_with_prepare_stmt") {
             """
 
             sql """ set group_commit = async_mode; """
+            sql """ set enable_insert_strict = false; """
 
             // 1. insert into
             def insert_stmt = prepareStatement """ INSERT INTO ${table} 
VALUES(?, ?, ?) """
diff --git 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
index de2581e0237..6909a919c67 100644
--- 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
+++ 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -212,14 +212,22 @@ suite("test_group_commit_http_stream") {
 
             set 'group_commit', 'async_mode'
             file "test_stream_load3.csv"
-            set 'max_filter_ratio', '0.7'
+            // TODO max_filter_ratio is not supported http_stream
+            // set 'max_filter_ratio', '0.7'
             unset 'label'
 
             time 10000 // limit inflight 10s
 
             check { result, exception, startTime, endTime ->
                 // TODO different with stream load: 6, 2, 3, 1
-                checkStreamLoadResult(exception, result, 6, 4, 2, 0)
+                // checkStreamLoadResult(exception, result, 5, 4, 1, 0)
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("fail", json.Status.toLowerCase())
+                assertTrue(json.Message.contains("too many filtered rows"))
             }
         }
 
@@ -246,7 +254,7 @@ suite("test_group_commit_http_stream") {
             }
         }
 
-        getRowCount(22)
+        getRowCount(19)
         qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
     } finally {
         // try_sql("DROP TABLE ${tableName}")
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index 6ae9fb13b4d..b60b6dc5555 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") {
             time 10000 // limit inflight 10s
 
             check { result, exception, startTime, endTime ->
-                checkStreamLoadResult(exception, result, 6, 2, 3, 1)
+                checkStreamLoadResult(exception, result, 6, 3, 2, 1)
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to