This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5766047c616 branch-3.0: [fix](group commit) fix group commit with schema change (#51144) (#51306) 5766047c616 is described below commit 5766047c616a29a0fb389203bd4b0735bbb17f2b Author: meiyi <me...@selectdb.com> AuthorDate: Thu May 29 10:07:54 2025 +0800 branch-3.0: [fix](group commit) fix group commit with schema change (#51144) (#51306) pick https://github.com/apache/doris/pull/51144 --- be/src/olap/wal/wal_table.cpp | 1 + .../exec/group_commit_block_sink_operator.cpp | 8 +- be/src/runtime/group_commit_mgr.cpp | 36 +++--- be/src/runtime/group_commit_mgr.h | 15 ++- .../org/apache/doris/alter/SchemaChangeJobV2.java | 17 +++ .../apache/doris/service/FrontendServiceImpl.java | 4 + .../test_group_commit_schema_change.out | Bin 0 -> 115 bytes .../group_commit/test_group_commit_error.groovy | 19 +++ .../test_group_commit_replay_wal.groovy | 15 ++- .../test_group_commit_schema_change.groovy | 135 +++++++++++++++++++++ 10 files changed, 224 insertions(+), 26 deletions(-) diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index a671717b50f..aed180c86a3 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -251,6 +251,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, ctx->group_commit = false; ctx->load_type = TLoadType::MANUL_LOAD; ctx->load_src_type = TLoadSourceType::RAW; + ctx->max_filter_ratio = 1; auto st = _http_stream_action->process_put(nullptr, ctx); if (st.ok()) { // wait stream load finish diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index a9201f0302f..8b1ef1be95a 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -68,9 +68,9 @@ Status GroupCommitBlockSinkLocalState::_initialize_load_queue() { auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>(); if (_state->exec_env()->wal_mgr()->is_running()) { RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( - p._db_id, p._table_id, p._base_schema_version, p._load_id, _load_block_queue, - _state->be_exec_version(), _state->query_mem_tracker(), _create_plan_dependency, - _put_block_dependency)); + p._db_id, p._table_id, p._base_schema_version, p._schema->indexes().size(), + p._load_id, _load_block_queue, _state->be_exec_version(), + _state->query_mem_tracker(), _create_plan_dependency, _put_block_dependency)); _state->set_import_label(_load_block_queue->label); _state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id return Status::OK(); @@ -259,7 +259,7 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) { RETURN_IF_ERROR(_schema->init(table_sink.schema)); _db_id = table_sink.db_id; _table_id = table_sink.table_id; - _base_schema_version = table_sink.base_schema_version; + _base_schema_version = _schema->version(); _partition = table_sink.partition; _group_commit_mode = table_sink.group_commit_mode; _load_id = table_sink.load_id; diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index f49d6708bcc..0f22dbf4573 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -46,6 +46,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, return runtime_state->cancel_reason(); } RETURN_IF_ERROR(status); + DBUG_EXECUTE_IF("LoadBlockQueue.add_block.block", DBUG_BLOCK); if (block->rows() > 0) { if (!config::group_commit_wait_replay_wal_finish) { _block_queue.emplace_back(block); @@ -144,7 +145,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* << ", duration=" << duration << ", load_ids=" << get_load_ids(); } } - if (!_need_commit && !timer_dependency->ready()) { + if (!_need_commit) { get_block_dep->block(); VLOG_DEBUG << "block get_block for query_id=" << load_instance_id; } @@ -254,7 +255,7 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) { } Status GroupCommitTable::get_first_block_load_queue( - int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, + int64_t table_id, int64_t base_schema_version, int64_t index_size, const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> create_plan_dep, @@ -270,7 +271,8 @@ Status GroupCommitTable::get_first_block_load_queue( } for (const auto& [_, inner_block_queue] : _load_block_queues) { if (!inner_block_queue->need_commit()) { - if (base_schema_version == inner_block_queue->schema_version) { + if (base_schema_version == inner_block_queue->schema_version && + index_size == inner_block_queue->index_size) { if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) { load_block_queue = inner_block_queue; return Status::OK(); @@ -290,8 +292,8 @@ Status GroupCommitTable::get_first_block_load_queue( return Status::OK(); } create_plan_dep->block(); - _create_plan_deps.emplace(load_id, - std::make_tuple(create_plan_dep, put_block_dep, base_schema_version)); + _create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep, put_block_dep, + base_schema_version, index_size)); if (!_is_creating_plan_fragment) { _is_creating_plan_fragment = true; RETURN_IF_ERROR( @@ -378,18 +380,21 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, LOG(WARNING) << "create group commit load error, st=" << st.to_string(); return st; } - auto schema_version = result.base_schema_version; auto& pipeline_params = result.pipeline_params; + auto schema_version = pipeline_params.fragment.output_sink.olap_table_sink.schema.version; + auto index_size = + pipeline_params.fragment.output_sink.olap_table_sink.schema.indexes.size(); DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id == _db_id); txn_id = pipeline_params.txn_conf.txn_id; DCHECK(pipeline_params.local_params.size() == 1); instance_id = pipeline_params.local_params[0].fragment_instance_id; VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << _table_id - << ", schema version=" << schema_version << ", label=" << label - << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id); + << ", schema version=" << schema_version << ", index size=" << index_size + << ", label=" << label << ", txn_id=" << txn_id + << ", instance_id=" << print_id(instance_id); { auto load_block_queue = std::make_shared<LoadBlockQueue>( - instance_id, label, txn_id, schema_version, _all_block_queues_bytes, + instance_id, label, txn_id, schema_version, index_size, _all_block_queues_bytes, result.wait_internal_group_commit_finish, result.group_commit_interval_ms, result.group_commit_data_bytes); RETURN_IF_ERROR(load_block_queue->create_wal( @@ -403,7 +408,8 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, for (const auto& [id, load_info] : _create_plan_deps) { auto create_dep = std::get<0>(load_info); auto put_dep = std::get<1>(load_info); - if (load_block_queue->schema_version == std::get<2>(load_info)) { + if (load_block_queue->schema_version == std::get<2>(load_info) && + load_block_queue->index_size == std::get<3>(load_info)) { if (load_block_queue->add_load_id(id, put_dep).ok()) { create_dep->set_ready(); success_load_ids.emplace_back(id); @@ -624,9 +630,9 @@ void GroupCommitMgr::stop() { } Status GroupCommitMgr::get_first_block_load_queue( - int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, - std::shared_ptr<MemTrackerLimiter> mem_tracker, + int64_t db_id, int64_t table_id, int64_t base_schema_version, int64_t index_size, + const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, + int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> create_plan_dep, std::shared_ptr<pipeline::Dependency> put_block_dep) { std::shared_ptr<GroupCommitTable> group_commit_table; @@ -640,8 +646,8 @@ Status GroupCommitMgr::get_first_block_load_queue( group_commit_table = _table_map[table_id]; } RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue( - table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker, - create_plan_dep, put_block_dep)); + table_id, base_schema_version, index_size, load_id, load_block_queue, be_exe_version, + mem_tracker, create_plan_dep, put_block_dep)); return Status::OK(); } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 32579547893..2be17400026 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -55,7 +55,7 @@ struct BlockData { class LoadBlockQueue { public: LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id, - int64_t schema_version, + int64_t schema_version, int64_t index_size, std::shared_ptr<std::atomic_size_t> all_block_queues_bytes, bool wait_internal_group_commit_finish, int64_t group_commit_interval_ms, int64_t group_commit_data_bytes) @@ -63,6 +63,7 @@ public: label(label), txn_id(txn_id), schema_version(schema_version), + index_size(index_size), wait_internal_group_commit_finish(wait_internal_group_commit_finish), _group_commit_interval_ms(group_commit_interval_ms), _start_time(std::chrono::steady_clock::now()), @@ -108,6 +109,7 @@ public: std::string label; int64_t txn_id; int64_t schema_version; + int64_t index_size; bool wait_internal_group_commit_finish = false; bool data_size_condition = false; @@ -157,7 +159,7 @@ public: _db_id(db_id), _table_id(table_id) {}; Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version, - const UniqueId& load_id, + int64_t index_size, const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, @@ -189,9 +191,10 @@ private: // fragment_instance_id to load_block_queue std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues; bool _is_creating_plan_fragment = false; - // user_load_id -> <create_plan_dep, put_block_dep, base_schema_version> - std::unordered_map<UniqueId, std::tuple<std::shared_ptr<pipeline::Dependency>, - std::shared_ptr<pipeline::Dependency>, int64_t>> + // user_load_id -> <create_plan_dep, put_block_dep, base_schema_version, index_size> + std::unordered_map<UniqueId, + std::tuple<std::shared_ptr<pipeline::Dependency>, + std::shared_ptr<pipeline::Dependency>, int64_t, int64_t>> _create_plan_deps; }; @@ -207,7 +210,7 @@ public: std::shared_ptr<LoadBlockQueue>& load_block_queue, std::shared_ptr<pipeline::Dependency> get_block_dep); Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version, - const UniqueId& load_id, + int64_t index_size, const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index ea53a931131..b3136b41b04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -48,6 +48,7 @@ import org.apache.doris.common.SchemaVersionAndHash; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DbUtil; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; @@ -377,6 +378,14 @@ public class SchemaChangeJobV2 extends AlterJobV2 { // create all replicas success. // add all shadow indexes to catalog + while (DebugPointUtil.isEnable("FE.SchemaChangeJobV2.createShadowIndexReplica.addShadowIndexToCatalog.block")) { + try { + Thread.sleep(1000); + LOG.info("block addShadowIndexToCatalog for job: {}", jobId); + } catch (InterruptedException e) { + LOG.warn("InterruptedException: ", e); + } + } tbl.writeLockOrAlterCancelException(); try { Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); @@ -609,6 +618,14 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } return; } + while (DebugPointUtil.isEnable("FE.SchemaChangeJobV2.runRunning.block")) { + try { + Thread.sleep(1000); + LOG.info("block schema change for job: {}", jobId); + } catch (InterruptedException e) { + LOG.warn("InterruptedException: ", e); + } + } Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId); Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId); Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index bbf8555f0fd..9140b033f5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2174,6 +2174,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { String originStmt = request.getLoadSql(); HttpStreamParams httpStreamParams; try { + while (DebugPointUtil.isEnable("FE.FrontendServiceImpl.initHttpStreamPlan.block")) { + Thread.sleep(1000); + LOG.info("block initHttpStreamPlan"); + } StmtExecutor executor = new StmtExecutor(ctx, originStmt); ctx.setExecutor(executor); httpStreamParams = executor.generateHttpStreamPlan(ctx.queryId()); diff --git a/regression-test/data/insert_p0/group_commit/test_group_commit_schema_change.out b/regression-test/data/insert_p0/group_commit/test_group_commit_schema_change.out new file mode 100644 index 00000000000..3747bce7722 Binary files /dev/null and b/regression-test/data/insert_p0/group_commit/test_group_commit_schema_change.out differ diff --git a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy index 7f785a3292f..6e9a89aa0f7 100644 --- a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy +++ b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy @@ -73,4 +73,23 @@ suite("test_group_commit_error", "nonConcurrent") { } finally { GetDebugPoint().clearDebugPointsForAllBEs() } + + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.block") + Thread thread = new Thread(() -> { + sql """ set group_commit = async_mode """ + sql """ insert into ${tableName} values (5, 4) """ + }) + thread.start() + sleep(4000) + GetDebugPoint().clearDebugPointsForAllBEs() + thread.join() + def result = sql "select count(*) from ${tableName}" + logger.info("rowCount 0: ${result}") + } catch (Exception e) { + logger.warn("unexpected failed: " + e.getMessage()) + assertTrue(false, "unexpected failed: " + e.getMessage()) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } } \ No newline at end of file diff --git a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy index 2858d1e4f51..0fbc3ec0a8d 100644 --- a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy +++ b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy @@ -37,6 +37,10 @@ suite("test_group_commit_replay_wal", "nonConcurrent") { `k` int , `v` int , ) engine=olap + PARTITION BY LIST(k) ( + PARTITION p1 VALUES IN ("1","2","3","4"), + PARTITION p2 VALUES IN ("5") + ) DISTRIBUTED BY HASH(`k`) BUCKETS 5 properties("replication_num" = "1", "group_commit_interval_ms"="2000") @@ -86,9 +90,18 @@ suite("test_group_commit_replay_wal", "nonConcurrent") { sleep(4000) // wal replay but all failed getRowCount(5) // check wal count is 1 + sql """ ALTER TABLE ${tableName} DROP PARTITION p2 """ + for (int i = 0; i < 10; i++) { + List<List<Object>> partitions = sql "show partitions from ${tableName};" + logger.info("partitions: ${partitions}") + if (partitions.size() == 1) { + break + } + sleep(100) + } GetDebugPoint().clearDebugPointsForAllFEs() - getRowCount(10) + getRowCount(8) // check wal count is 0 } catch (Exception e) { logger.info("failed: " + e.getMessage()) diff --git a/regression-test/suites/insert_p0/group_commit/test_group_commit_schema_change.groovy b/regression-test/suites/insert_p0/group_commit/test_group_commit_schema_change.groovy new file mode 100644 index 00000000000..06bbbebef5b --- /dev/null +++ b/regression-test/suites/insert_p0/group_commit/test_group_commit_schema_change.groovy @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovyjarjarantlr4.v4.codegen.model.ExceptionClause + +import java.util.Date +import java.text.SimpleDateFormat +import org.apache.http.HttpResponse +import org.apache.http.client.methods.HttpPut +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.RedirectStrategy +import org.apache.http.protocol.HttpContext +import org.apache.http.HttpRequest +import org.apache.http.impl.client.LaxRedirectStrategy +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.entity.StringEntity +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.util.EntityUtils +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_group_commit_schema_change", "nonConcurrent") { + def tableName3 = "test_group_commit_schema_change" + + onFinish { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + logger.info("jobStateResult: ${jobStateResult}") + return jobStateResult[0][9] + } + + def getRowCount = { expectedRowCount -> + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until( + { + def result = sql "select count(*) from ${tableName3}" + logger.info("table: ${tableName3}, rowCount: ${result}") + return result[0][0] == expectedRowCount + } + ) + } + + GetDebugPoint().clearDebugPointsForAllFEs() + sql """ DROP TABLE IF EXISTS ${tableName3} """ + sql """ + CREATE TABLE ${tableName3} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` varchar(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "200" + ); + """ + + GetDebugPoint().enableDebugPointForAllFEs("FE.FrontendServiceImpl.initHttpStreamPlan.block") + GetDebugPoint().enableDebugPointForAllFEs("FE.SchemaChangeJobV2.createShadowIndexReplica.addShadowIndexToCatalog.block") + + // write data + Thread thread = new Thread(() -> { + sql """ set group_commit = async_mode; """ + for (int i = 0; i < 10; i++) { + try { + sql """ insert into ${tableName3} values (1, 'a', 100) """ + break + } catch (Exception e) { + logger.info("insert error: ${e}") + if (e.getMessage().contains("schema version not match")) { + continue + } else { + throw e + } + } + } + }) + thread.start() + sleep(1000) + def result = sql "select count(*) from ${tableName3}" + logger.info("rowCount 0: ${result}") + assertEquals(0, result[0][0]) + + // schema change + sql """ alter table ${tableName3} modify column score int NULL""" + GetDebugPoint().enableDebugPointForAllFEs("FE.SchemaChangeJobV2.runRunning.block") + GetDebugPoint().disableDebugPointForAllFEs("FE.SchemaChangeJobV2.createShadowIndexReplica.addShadowIndexToCatalog.block") + for (int i = 0; i < 10; i++) { + def job_state = getJobState(tableName3) + if (job_state == "RUNNING") { + break + } + sleep(100) + } + + GetDebugPoint().disableDebugPointForAllFEs("FE.FrontendServiceImpl.initHttpStreamPlan.block") + thread.join() + getRowCount(1) + qt_sql """ select id, name, score from ${tableName3} """ + def job_state = getJobState(tableName3) + assertEquals("RUNNING", job_state) + GetDebugPoint().disableDebugPointForAllFEs("FE.SchemaChangeJobV2.runRunning.block") + for (int i = 0; i < 10; i++) { + job_state = getJobState(tableName3) + if (job_state == "FINISHED") { + break + } + sleep(100) + } + assertEquals("FINISHED", job_state) +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org