This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new bcd01c5f242 branch-2.1-pick: [Fix](merge-on-write) should calculate delete bitmaps between segmens before skip if tablet is in `NOT_READY` state in flush phase (#48056) (#48089) bcd01c5f242 is described below commit bcd01c5f2425c8bd9a00392e6c893b5e77d0fb2e Author: bobhan1 <bao...@selectdb.com> AuthorDate: Fri Feb 21 19:53:17 2025 +0800 branch-2.1-pick: [Fix](merge-on-write) should calculate delete bitmaps between segmens before skip if tablet is in `NOT_READY` state in flush phase (#48056) (#48089) pick https://github.com/apache/doris/pull/48056 --- be/src/olap/memtable.cpp | 2 + be/src/olap/rowset_builder.cpp | 15 +- .../test_skip_calc_between_segments.out | Bin 0 -> 175 bytes .../org/apache/doris/regression/suite/Suite.groovy | 68 +++++++++ .../test_skip_calc_between_segments.groovy | 153 +++++++++++++++++++++ 5 files changed, 231 insertions(+), 7 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index ec64406eb31..856cba6a1f5 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -35,6 +35,7 @@ #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "tablet_meta.h" +#include "util/debug_points.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "vec/aggregate_functions/aggregate_function_reader.h" @@ -486,6 +487,7 @@ void MemTable::shrink_memtable_by_agg() { } bool MemTable::need_flush() const { + DBUG_EXECUTE_IF("MemTable.need_flush", { return true; }); auto max_size = config::write_buffer_size; if (_is_partial_update) { auto update_columns_size = _num_columns; diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index c668df4bd33..ddb55775965 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -255,13 +255,6 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { } std::lock_guard<std::mutex> l(_lock); SCOPED_TIMER(_submit_delete_bitmap_timer); - // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (tablet()->tablet_state() == TABLET_NOTREADY) { - LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " - "tablet_id: " - << tablet()->tablet_id() << " txn_id: " << _req.txn_id; - return Status::OK(); - } auto* beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get()); std::vector<segment_v2::SegmentSharedPtr> segments; RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); @@ -271,6 +264,14 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { tablet()->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap)); } + // tablet is under alter process. The delete bitmap will be calculated after conversion. + if (_tablet->tablet_state() == TABLET_NOTREADY) { + LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " + "tablet_id: " + << _tablet->tablet_id() << " txn_id: " << _req.txn_id; + return Status::OK(); + } + // For partial update, we need to fill in the entire row of data, during the calculation // of the delete bitmap. This operation is resource-intensive, and we need to minimize // the number of times it occurs. Therefore, we skip this operation here. diff --git a/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out b/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out new file mode 100644 index 00000000000..783cb8c5f8f Binary files /dev/null and b/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out differ diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index c9e057cd7d0..3cd7f89ba09 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1543,6 +1543,74 @@ class Suite implements GroovyInterceptable { } } + def get_be_param = { paramName -> + def ipList = [:] + def portList = [:] + def backendId_to_params = [:] + getBackendIpHttpPort(ipList, portList) + for (String id in ipList.keySet()) { + def beIp = ipList.get(id) + def bePort = portList.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assert code == 0 + assert out.contains(paramName) + // parsing + def resultList = parseJson(out)[0] + assert resultList.size() == 4 + // get original value + def paramValue = resultList[2] + backendId_to_params.put(id, paramValue) + } + logger.info("backendId_to_params: ${backendId_to_params}".toString()) + return backendId_to_params + } + + def set_be_param = { paramName, paramValue -> + def ipList = [:] + def portList = [:] + getBackendIpHttpPort(ipList, portList) + for (String id in ipList.keySet()) { + def beIp = ipList.get(id) + def bePort = portList.get(id) + logger.info("set be_id ${id} ${paramName} to ${paramValue}".toString()) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assert out.contains("OK") + } + } + + def set_original_be_param = { paramName, backendId_to_params -> + def ipList = [:] + def portList = [:] + getBackendIpHttpPort(ipList, portList) + for (String id in ipList.keySet()) { + def beIp = ipList.get(id) + def bePort = portList.get(id) + def paramValue = backendId_to_params.get(id) + logger.info("set be_id ${id} ${paramName} to ${paramValue}".toString()) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assert out.contains("OK") + } + } + + void setBeConfigTemporary(Map<String, Object> tempConfig, Closure actionSupplier) { + Map<String, Map<String, String>> originConf = Maps.newHashMap() + tempConfig.each{ k, v -> + originConf.put(k, get_be_param(k)) + } + try { + tempConfig.each{ k, v -> set_be_param(k, v)} + actionSupplier() + } catch (Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + originConf.each { k, confs -> + set_original_be_param(k, confs) + } + } + } + void waiteCreateTableFinished(String tableName) { Thread.sleep(2000); String showCreateTable = "SHOW CREATE TABLE ${tableName}" diff --git a/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy b/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy new file mode 100644 index 00000000000..a549bbec562 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_skip_calc_between_segments", "nonConcurrent") { + + def table1 = "test_skip_calc_between_segments" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(99999,99999,99999);" + sql "insert into ${table1} values(88888,88888,88888);" + sql "insert into ${table1} values(77777,77777,77777);" + sql "sync;" + qt_sql "select * from ${table1} order by k1;" + + def block_sc = { + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block") + } + + def unblock_sc = { + GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block") + } + + def block_publish = { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + + def unblock_publish = { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + + def checkSegmentNum = { rowsetNum, lastRowsetSegmentNum -> + def tablets = sql_return_maparray """ show tablets from ${table1}; """ + logger.info("tablets: ${tablets}") + assertEquals(1, tablets.size()) + String compactionUrl = tablets[0]["CompactionStatus"] + def (code, out, err) = curl("GET", compactionUrl) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + assert tabletJson.rowsets.size() == rowsetNum + 1 + def rowset = tabletJson.rowsets.get(tabletJson.rowsets.size() - 1) + logger.info("rowset: ${rowset}") + int start_index = rowset.indexOf("]") + int end_index = rowset.indexOf("DATA") + def segmentNumStr = rowset.substring(start_index + 1, end_index).trim() + logger.info("segmentNumStr: ${segmentNumStr}") + assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr) + } + + // to cause multi segments + def customBeConfig = [ + doris_scanner_row_bytes : 1 + ] + + setBeConfigTemporary(customBeConfig) { + try { + // batch_size is 4164 in csv_reader.cpp + // _batch_size is 8192 in vtablet_writer.cpp + + // to cause multi segments + GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush") + + block_publish() + + // block sc to let load skip to calculate delete bitmap in flush and commit phase + block_sc() + + sql "alter table ${table1} modify column c1 varchar(100);" + + Thread.sleep(3000) + + def t1 = Thread.start { + // load data that will have multi segments and there are duplicate keys between segments + String content = "" + (1..4096).each { + content += "${it},${it},${it}\n" + } + content += content + streamLoad { + table "${table1}" + set 'column_separator', ',' + inputStream new ByteArrayInputStream(content.getBytes()) + time 30000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(8192, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + } + + + Thread.sleep(2000) + + // let sc finish and wait for tablet state to be RUNNING + unblock_sc() + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + time 1000 + } + logger.info("wait for schema change done") + + Thread.sleep(500) + + unblock_publish() + + t1.join() + // ensure that we really write multi segments + checkSegmentNum(4, 3) + + qt_sql "select count() from (select k1,count() as cnt from ${table1} group by k1 having cnt > 1) A;" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org