This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 94879459492 [fix](delete-sign) Disable deletion with delete sign when doing cumulative compaction (#37950) 94879459492 is described below commit 94879459492376473b9cc1d02b9c979da0587579 Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Fri Jul 19 09:29:32 2024 +0800 [fix](delete-sign) Disable deletion with delete sign when doing cumulative compaction (#37950) ## Proposed changes Delete sign could not be applied when delete on cumu compaction is enabled, bucause it is meant for delete with predicates. If delete design is applied on cumu compaction, it will lose effect when doing base compaction. So disable delete sign when doing cumulative compaction. --- be/src/olap/tablet_reader.cpp | 21 ++-- be/src/olap/tablet_reader.h | 1 + be/src/vec/olap/block_reader.cpp | 3 +- be/src/vec/olap/vertical_block_reader.cpp | 2 +- .../test_delete_sign_with_cumu_compaction.out | 5 + .../test_delete_sign_with_cumu_compaction.groovy | 116 +++++++++++++++++++++ 6 files changed, 137 insertions(+), 11 deletions(-) diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index e65a10ac73e..631a041379a 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -640,14 +640,19 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) { !config::enable_delete_when_cumu_compaction)) { return Status::OK(); } - // Only BASE_COMPACTION and COLD_DATA_COMPACTION and CUMULATIVE_COMPACTION need set filter_delete = true - // other reader type: - // QUERY will filter the row in query layer to keep right result use where clause. - _filter_delete = (read_params.reader_type == ReaderType::READER_BASE_COMPACTION || - read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION || - ((read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION && - config::enable_delete_when_cumu_compaction)) || - read_params.reader_type == ReaderType::READER_CHECKSUM); + bool cumu_delete = read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION && + config::enable_delete_when_cumu_compaction; + // Delete sign could not be applied when delete on cumu compaction is enabled, bucause it is meant for delete with predicates. + // If delete design is applied on cumu compaction, it will lose effect when doing base compaction. + // `_delete_sign_available` indicates the condition where we could apply delete signs to data. + _delete_sign_available = (read_params.reader_type == ReaderType::READER_BASE_COMPACTION || + read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION || + read_params.reader_type == ReaderType::READER_CHECKSUM); + + // `_filter_delete` indicates the condition where we should execlude deleted tuples when reading data. + // However, queries will not use this condition but generate special where predicates to filter data. + // (Though a lille bit confused, it is how the current logic working...) + _filter_delete = _delete_sign_available || cumu_delete; auto* runtime_state = read_params.runtime_state; bool enable_sub_pred_v2 = runtime_state == nullptr ? true : runtime_state->enable_delete_sub_pred_v2(); diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index c257ba007f5..06c3daa653a 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -295,6 +295,7 @@ protected: // for agg query, we don't need to finalize when scan agg object data ReaderType _reader_type = ReaderType::READER_QUERY; bool _next_delete_flag = false; + bool _delete_sign_available = false; bool _filter_delete = false; int32_t _sequence_col_idx = -1; bool _direct_mode = false; diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index ec6bb5cc92b..a606b83345d 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -377,8 +377,7 @@ Status BlockReader::_unique_key_next_block(Block* block, bool* eof) { } } while (target_block_row < _reader_context.batch_size); - // do filter delete row in base compaction, only base compaction need to do the job - if (_filter_delete) { + if (_delete_sign_available) { int delete_sign_idx = _reader_context.tablet_schema->field_index(DELETE_SIGN); DCHECK(delete_sign_idx > 0); if (delete_sign_idx <= 0 || delete_sign_idx >= target_columns.size()) { diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 646af24b6b8..98efd767961 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -462,7 +462,7 @@ Status VerticalBlockReader::_unique_key_next_block(Block* block, bool* eof) { } size_t block_rows = block->rows(); - if (_filter_delete && block_rows > 0) { + if (_delete_sign_available && block_rows > 0) { int ori_delete_sign_idx = _reader_context.tablet_schema->field_index(DELETE_SIGN); if (ori_delete_sign_idx < 0) { *eof = (res.is<END_OF_FILE>()); diff --git a/regression-test/data/delete_p0/test_delete_sign_with_cumu_compaction.out b/regression-test/data/delete_p0/test_delete_sign_with_cumu_compaction.out new file mode 100644 index 00000000000..95f9728f8db --- /dev/null +++ b/regression-test/data/delete_p0/test_delete_sign_with_cumu_compaction.out @@ -0,0 +1,5 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- +11 2 3.0000000000000 +12 2 3.0000000000000 + diff --git a/regression-test/suites/delete_p0/test_delete_sign_with_cumu_compaction.groovy b/regression-test/suites/delete_p0/test_delete_sign_with_cumu_compaction.groovy new file mode 100644 index 00000000000..eca0ed41128 --- /dev/null +++ b/regression-test/suites/delete_p0/test_delete_sign_with_cumu_compaction.groovy @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility; +import static java.util.concurrent.TimeUnit.SECONDS; + +suite('test_delete_sign_with_cumu_compaction') { + def table = 'test_delete_sign_with_cumu_compaction' + + sql """ DROP TABLE IF EXISTS ${table};""" + sql """ + CREATE TABLE ${table} + ( + col1 TINYINT NOT NULL,col2 BIGINT NOT NULL,col3 DECIMAL(36, 13) NOT NULL, + ) + UNIQUE KEY(`col1`,`col2`,`col3`) + DISTRIBUTED BY HASH(`col1`,`col2`,`col3`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "false", "disable_auto_compaction"="true", + "replication_num" = "1" + ); + """ + + String backend_id; + //TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus + def tablet = (sql """ show tablets from ${table}; """)[0] + backend_id = tablet[2] + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + boolean allowDeleteWhenCumu = false + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + } + if (((List<String>) ele)[0] == "enable_delete_when_cumu_compaction") { + allowDeleteWhenCumu = Boolean.parseBoolean(((List<String>) ele)[2]) + } + } + + if (!allowDeleteWhenCumu) { + logger.info("Skip test compaction when cumu compaction because not enabled this config") + return + } + + def waitForCompaction = { be_host, be_http_port -> + // wait for all compactions done + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until { + String tablet_id = tablet[0] + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + + !compactionStatus.run_status + } + } + + (1..10).each { i -> + sql """INSERT into ${table} (col1,col2,col3) values (${i}, 2, 3)""" + } + be_run_cumulative_compaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id], tablet[0]); + waitForCompaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id]) + + (11..12).each { i -> + sql """INSERT into ${table} (col1,col2,col3) values (${i}, 2, 3)""" + } + be_run_cumulative_compaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id], tablet[0]); + waitForCompaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id]) + + be_run_base_compaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id], tablet[0]); + waitForCompaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id]) + + (1..10).each { i -> + sql """ INSERT into ${table} (col1,col2,col3,__DORIS_DELETE_SIGN__) values (${i}, 2, 3, 1) """ + } + + be_run_cumulative_compaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id], tablet[0]); + waitForCompaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id]) + + qt_select_default """ SELECT * FROM ${table} ORDER BY col1 """ + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org