This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 24969d5ad42 [fix](group commit) Fix some group commit problems (#37595)
24969d5ad42 is described below

commit 24969d5ad42a21a2fe90e6a69c66c45bbdb54e1d
Author: meiyi <myime...@gmail.com>
AuthorDate: Thu Jul 11 15:20:02 2024 +0800

    [fix](group commit) Fix some group commit problems (#37595)
    
    ## Proposed changes
    
    Fix some problems found by
    https://github.com/apache/doris/pull/26856/files
    
    1. Some insert should not run into group commit mode, including insert
    overwrite and mvmt
    2. If there is no partition for the data, should throw an exception
---
 .../exec/group_commit_block_sink_operator.cpp      | 24 +++++++++++++++-------
 be/src/runtime/group_commit_mgr.cpp                | 24 +++++++++++-----------
 be/src/runtime/group_commit_mgr.h                  |  6 ++----
 .../glue/translator/PhysicalPlanTranslator.java    |  5 +++--
 .../commands/insert/InsertIntoTableCommand.java    |  4 +++-
 .../insert/OlapGroupCommitInsertExecutor.java      |  3 +++
 .../insert_p0/insert_group_commit_into.groovy      |  2 ++
 regression-test/suites/insert_p0/txn_insert.groovy |  2 +-
 .../test_group_commit_stream_load.groovy           |  2 +-
 9 files changed, 44 insertions(+), 28 deletions(-)

diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp 
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 424ede07be5..9bac6d4ed29 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -61,21 +61,19 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* 
state) {
                                                         
"CreateGroupCommitPlanDependency", true);
     _put_block_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                                       
"GroupCommitPutBlockDependency", true);
-    WARN_IF_ERROR(_initialize_load_queue(), "");
+    [[maybe_unused]] auto st = _initialize_load_queue();
     return Status::OK();
 }
 
 Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
     auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
     if (_state->exec_env()->wal_mgr()->is_running()) {
-        std::string label;
-        int64_t txn_id;
         
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
                 p._db_id, p._table_id, p._base_schema_version, p._load_id, 
_load_block_queue,
                 _state->be_exec_version(), _state->query_mem_tracker(), 
_create_plan_dependency,
-                _put_block_dependency, label, txn_id));
-        _state->set_import_label(label);
-        _state->set_wal_id(txn_id); // wal_id is txn_id
+                _put_block_dependency));
+        _state->set_import_label(_load_block_queue->label);
+        _state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id
         return Status::OK();
     } else {
         return Status::InternalError("be is stopping");
@@ -339,13 +337,25 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* 
state, vectorized::Bloc
             local_state._vpartition->find_partition(block.get(), index,
                                                     
local_state._partitions[index]);
         }
+        bool stop_processing = false;
         for (int row_index = 0; row_index < rows; row_index++) {
             if (local_state._partitions[row_index] == nullptr) [[unlikely]] {
                 local_state._filter_bitmap.Set(row_index, true);
                 LOG(WARNING) << "no partition for this tuple. tuple="
                              << block->dump_data(row_index, 1);
+                RETURN_IF_ERROR(state->append_error_msg_to_file(
+                        []() -> std::string { return ""; },
+                        [&]() -> std::string {
+                            fmt::memory_buffer buf;
+                            fmt::format_to(buf, "no partition for this tuple. 
tuple=\n{}",
+                                           block->dump_data(row_index, 1));
+                            return fmt::to_string(buf);
+                        },
+                        &stop_processing));
+                local_state._has_filtered_rows = true;
+                state->update_num_rows_load_filtered(1);
+                state->update_num_rows_load_total(-1);
             }
-            local_state._has_filtered_rows = true;
         }
     }
 
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index f0a57162458..5f989da023b 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -138,7 +138,7 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
             auto last_print_duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(
                                                
std::chrono::steady_clock::now() - _last_print_time)
                                                .count();
-            if (last_print_duration >= 5000) {
+            if (last_print_duration >= 10000) {
                 _last_print_time = std::chrono::steady_clock::now();
                 LOG(INFO) << "find one group_commit need to commit, txn_id=" 
<< txn_id
                           << ", label=" << label << ", instance_id=" << 
load_instance_id
@@ -258,15 +258,13 @@ Status GroupCommitTable::get_first_block_load_queue(
         std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
         std::shared_ptr<MemTrackerLimiter> mem_tracker,
         std::shared_ptr<pipeline::Dependency> create_plan_dep,
-        std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& 
label, int64_t& txn_id) {
+        std::shared_ptr<pipeline::Dependency> put_block_dep) {
     DCHECK(table_id == _table_id);
     std::unique_lock l(_lock);
     auto try_to_get_matched_queue = [&]() -> Status {
         for (const auto& [_, inner_block_queue] : _load_block_queues) {
             if (inner_block_queue->contain_load_id(load_id)) {
                 load_block_queue = inner_block_queue;
-                label = inner_block_queue->label;
-                txn_id = inner_block_queue->txn_id;
                 return Status::OK();
             }
         }
@@ -275,8 +273,6 @@ Status GroupCommitTable::get_first_block_load_queue(
                 if (base_schema_version == inner_block_queue->schema_version) {
                     if (inner_block_queue->add_load_id(load_id, 
put_block_dep).ok()) {
                         load_block_queue = inner_block_queue;
-                        label = inner_block_queue->label;
-                        txn_id = inner_block_queue->txn_id;
                         return Status::OK();
                     }
                 } else {
@@ -319,6 +315,10 @@ Status GroupCommitTable::get_first_block_load_queue(
 
 void GroupCommitTable::remove_load_id(const UniqueId& load_id) {
     std::unique_lock l(_lock);
+    if (_create_plan_deps.find(load_id) != _create_plan_deps.end()) {
+        _create_plan_deps.erase(load_id);
+        return;
+    }
     for (const auto& [_, inner_block_queue] : _load_block_queues) {
         if (inner_block_queue->remove_load_id(load_id).ok()) {
             return;
@@ -391,13 +391,13 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
                     instance_id, label, txn_id, schema_version, 
_all_block_queues_bytes,
                     result.wait_internal_group_commit_finish, 
result.group_commit_interval_ms,
                     result.group_commit_data_bytes);
-            std::unique_lock l(_lock);
             RETURN_IF_ERROR(load_block_queue->create_wal(
                     _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
                     
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
                     be_exe_version));
-            _load_block_queues.emplace(instance_id, load_block_queue);
 
+            std::unique_lock l(_lock);
+            _load_block_queues.emplace(instance_id, load_block_queue);
             std::vector<UniqueId> success_load_ids;
             for (const auto& [id, load_info] : _create_plan_deps) {
                 auto create_dep = std::get<0>(load_info);
@@ -409,8 +409,8 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
                     }
                 }
             }
-            for (const auto& id2 : success_load_ids) {
-                _create_plan_deps.erase(id2);
+            for (const auto& id : success_load_ids) {
+                _create_plan_deps.erase(id);
             }
         }
     }
@@ -610,7 +610,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
         std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
         std::shared_ptr<MemTrackerLimiter> mem_tracker,
         std::shared_ptr<pipeline::Dependency> create_plan_dep,
-        std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& 
label, int64_t& txn_id) {
+        std::shared_ptr<pipeline::Dependency> put_block_dep) {
     std::shared_ptr<GroupCommitTable> group_commit_table;
     {
         std::lock_guard wlock(_lock);
@@ -623,7 +623,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
     }
     RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
             table_id, base_schema_version, load_id, load_block_queue, 
be_exe_version, mem_tracker,
-            create_plan_dep, put_block_dep, label, txn_id));
+            create_plan_dep, put_block_dep));
     return Status::OK();
 }
 
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 16c7e0c24d3..c6cb34a022a 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -160,8 +160,7 @@ public:
                                       int be_exe_version,
                                       std::shared_ptr<MemTrackerLimiter> 
mem_tracker,
                                       std::shared_ptr<pipeline::Dependency> 
create_plan_dep,
-                                      std::shared_ptr<pipeline::Dependency> 
put_block_dep,
-                                      std::string& label, int64_t& txn_id);
+                                      std::shared_ptr<pipeline::Dependency> 
put_block_dep);
     Status get_load_block_queue(const TUniqueId& instance_id,
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                 std::shared_ptr<pipeline::Dependency> 
get_block_dep);
@@ -211,8 +210,7 @@ public:
                                       int be_exe_version,
                                       std::shared_ptr<MemTrackerLimiter> 
mem_tracker,
                                       std::shared_ptr<pipeline::Dependency> 
create_plan_dep,
-                                      std::shared_ptr<pipeline::Dependency> 
put_block_dep,
-                                      std::string& label, int64_t& txn_id);
+                                      std::shared_ptr<pipeline::Dependency> 
put_block_dep);
     void remove_load_id(int64_t table_id, const UniqueId& load_id);
     std::promise<Status> debug_promise;
     std::future<Status> debug_future = debug_promise.get_future();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 62c823fdec4..b4a957e72fc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -432,8 +432,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         // This statement is only used in the group_commit mode
         if (context.getConnectContext().isGroupCommit()) {
             sink = new GroupCommitBlockSink(olapTableSink.getTargetTable(), 
olapTuple,
-                olapTableSink.getTargetTable().getPartitionIds(), 
olapTableSink.isSingleReplicaLoad(),
-                context.getSessionVariable().getGroupCommit(), 0);
+                    olapTableSink.getTargetTable().getPartitionIds(), 
olapTableSink.isSingleReplicaLoad(),
+                    context.getSessionVariable().getGroupCommit(),
+                    
ConnectContext.get().getSessionVariable().getEnableInsertStrict() ? 0 : 1);
         } else {
             sink = new OlapTableSink(
                 olapTableSink.getTargetTable(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 7a1280092b0..df6fcd69fcb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -146,7 +146,9 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
             if (cte.isPresent()) {
                 this.logicalQuery = ((LogicalPlan) 
cte.get().withChildren(logicalQuery));
             }
-            if (this.logicalQuery instanceof UnboundTableSink) {
+            boolean isOverwrite = insertCtx.isPresent() && insertCtx.get() 
instanceof OlapInsertCommandContext
+                    && ((OlapInsertCommandContext) 
insertCtx.get()).isOverwrite();
+            if (this.logicalQuery instanceof UnboundTableSink && !isOverwrite) 
{
                 OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, 
targetTableIf,
                         (UnboundTableSink<?>) this.logicalQuery);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
index 5091bae17d1..984e8b0c8ca 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -17,12 +17,14 @@
 
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
+import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.mtmv.MTMVUtil;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
@@ -60,6 +62,7 @@ public class OlapGroupCommitInsertExecutor extends 
OlapInsertExecutor {
                 && ((OlapTable) 
table).getTableProperty().getUseSchemaLightChange()
                 && !((OlapTable) 
table).getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME)
                 && tableSink.getPartitions().isEmpty()
+                && (!(table instanceof MTMV) || 
MTMVUtil.allowModifyMTMVData(ctx))
                 && (tableSink.child() instanceof OneRowRelation || 
tableSink.child() instanceof LogicalUnion));
     }
 
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 9105a8dc8db..e1cb943a9ab 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -219,7 +219,9 @@ suite("insert_group_commit_into") {
                 // 7. insert into and add rollup
                 group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
                 group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
+                sql "set enable_insert_strict=false"
                 group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50),(101, 'a', 100);  """, 2
+                sql "set enable_insert_strict=true"
                 sql """ alter table ${table} ADD ROLLUP r1(name, score); """
                 group_commit_insert_with_retry """ insert into ${table}(id, 
name) values(2, 'b');  """, 1
                 group_commit_insert_with_retry """ insert into ${table}(id) 
values(6); """, 1
diff --git a/regression-test/suites/insert_p0/txn_insert.groovy 
b/regression-test/suites/insert_p0/txn_insert.groovy
index 088e1134900..4c42c3b12c4 100644
--- a/regression-test/suites/insert_p0/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert.groovy
@@ -495,7 +495,7 @@ suite("txn_insert") {
                     assertFalse(true, "should not reach here")
                 } catch (Exception e) {
                     logger.info("exception: " + e)
-                    assertTrue(e.getMessage().contains("The transaction is 
already timeout"))
+                    assertTrue(e.getMessage().contains("The transaction is 
already timeout") || e.getMessage().contains("Execute timeout"))
                 } finally {
                     try {
                         sql "rollback"
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index e12a1f2f01b..0f823f1d4ef 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") {
             time 10000 // limit inflight 10s
 
             check { result, exception, startTime, endTime ->
-                checkStreamLoadResult(exception, result, 6, 3, 2, 1)
+                checkStreamLoadResult(exception, result, 6, 2, 3, 1)
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to