This is an automated email from the ASF dual-hosted git repository. zhangchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0c734a861e [Enhancement](delete) eliminate reading the old values of non-key columns for delete stmt (#22270) 0c734a861e is described below commit 0c734a861e0c72f37a283c4ad5ae2bcd2a7e0330 Author: bobhan1 <bh2444151...@outlook.com> AuthorDate: Fri Jul 28 14:37:33 2023 +0800 [Enhancement](delete) eliminate reading the old values of non-key columns for delete stmt (#22270) --- be/src/olap/rowset/segment_v2/segment_writer.cpp | 37 ++++++-- .../apache/doris/planner/StreamLoadPlanner.java | 11 +++ .../delete/delete_mow_partial_update.out | 27 +++++- .../nereids_p0/delete/partial_update_delete.csv | 3 + .../partial_update/partial_update_delete.csv | 3 + .../partial_update/test_partial_update_delete.out | 43 ++++++++++ .../delete/delete_mow_partial_update.groovy | 40 ++++++++- .../test_partial_update_delete.groovy | 99 ++++++++++++++++++++++ 8 files changed, 253 insertions(+), 10 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index be1330cb4a..edc8c43a10 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -55,6 +55,7 @@ #include "vec/common/schema_util.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" #include "vec/io/reader_buffer.h" #include "vec/jsonb/serialize.h" #include "vec/olap/olap_data_convertor.h" @@ -364,6 +365,17 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* bool has_default_or_nullable = false; std::vector<bool> use_default_or_null_flag; use_default_or_null_flag.reserve(num_rows); + const vectorized::Int8* delete_sign_column_data = nullptr; + if (const vectorized::ColumnWithTypeAndName* delete_sign_column = + full_block.try_get_by_name(DELETE_SIGN); + delete_sign_column != nullptr) { + auto& delete_sign_col = + reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column)); + if (delete_sign_col.size() == num_rows) { + delete_sign_column_data = delete_sign_col.get_data().data(); + } + } + std::vector<RowsetSharedPtr> specified_rowsets; { std::shared_lock rlock(_tablet->get_header_lock()); @@ -412,10 +424,19 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* LOG(WARNING) << "failed to lookup row key, error: " << st; return st; } - // partial update should not contain invisible columns - use_default_or_null_flag.emplace_back(false); - _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); - _tablet->prepare_to_read(loc, pos, &_rssid_to_rid); + + // if the delete sign is marked, it means that the value columns of the row + // will not be read. So we don't need to read the missing values from the previous rows. + // But we still need to mark the previous row on delete bitmap + if (delete_sign_column_data != nullptr && delete_sign_column_data[pos - row_pos] != 0) { + has_default_or_nullable = true; + use_default_or_null_flag.emplace_back(true); + } else { + // partial update should not contain invisible columns + use_default_or_null_flag.emplace_back(false); + _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); + _tablet->prepare_to_read(loc, pos, &_rssid_to_rid); + } if (st.is<ALREADY_EXIST>()) { // although we need to mark delete current row, we still need to read missing columns @@ -551,14 +572,18 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f // if the column has default value, fiil it with default value // otherwise, if the column is nullable, fill it with null value const auto& tablet_column = _tablet_schema->column(cids_missing[i]); - CHECK(tablet_column.has_default_value() || tablet_column.is_nullable()); if (tablet_column.has_default_value()) { mutable_full_columns[cids_missing[i]]->insert_from( *mutable_default_value_columns[i].get(), 0); - } else { + } else if (tablet_column.is_nullable()) { auto nullable_column = assert_cast<vectorized::ColumnNullable*>( mutable_full_columns[cids_missing[i]].get()); nullable_column->insert_null_elements(1); + } else { + // If the control flow reaches this branch, the column neither has default value + // nor is nullable. It means that the row's delete sign is marked, and the value + // columns are useless and won't be read. So we can just put arbitary values in the cells + mutable_full_columns[cids_missing[i]]->insert_default(); } } continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index ac5d6c595c..ccdd7b336e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -171,6 +171,9 @@ public class StreamLoadPlanner { throw new UserException("Partial update should include all key columns, missing: " + col.getName()); } } + if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) { + partialUpdateInputColumns.add(Column.DELETE_SIGN); + } } // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { @@ -373,6 +376,11 @@ public class StreamLoadPlanner { + col.getName()); } partialUpdateInputColumns.add(col.getName()); + if (destTable.hasSequenceCol() && (taskInfo.hasSequenceCol() || ( + destTable.getSequenceMapCol() != null + && destTable.getSequenceMapCol().equalsIgnoreCase(col.getName())))) { + partialUpdateInputColumns.add(Column.SEQUENCE_COL); + } existInExpr = true; break; } @@ -381,6 +389,9 @@ public class StreamLoadPlanner { throw new UserException("Partial update should include all key columns, missing: " + col.getName()); } } + if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) { + partialUpdateInputColumns.add(Column.DELETE_SIGN); + } } // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { diff --git a/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out b/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out index 0b7c6bf68e..2f8e157a94 100644 --- a/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out +++ b/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out @@ -15,12 +15,33 @@ 5 5 -- !sql -- +1 \N 1 1 1 0 -1 1 1 +2 \N 1 2 2 0 -2 2 1 +3 \N 1 3 3 0 -3 3 1 4 4 0 5 5 0 +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +1 \N \N \N \N 1 +1 1 1 1 1 0 +2 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +3 3 3 3 3 0 +4 4 4 4 4 0 +5 5 5 5 5 0 + diff --git a/regression-test/data/nereids_p0/delete/partial_update_delete.csv b/regression-test/data/nereids_p0/delete/partial_update_delete.csv new file mode 100644 index 0000000000..5f5fbe759f --- /dev/null +++ b/regression-test/data/nereids_p0/delete/partial_update_delete.csv @@ -0,0 +1,3 @@ +1 +2 +3 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv new file mode 100644 index 0000000000..5f5fbe759f --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv @@ -0,0 +1,3 @@ +1 +2 +3 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out new file mode 100644 index 0000000000..89faa7fed0 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out @@ -0,0 +1,43 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +4 4 4 4 4 +5 5 5 5 5 + +-- !with_delete_sign -- +1 \N \N \N \N 1 +1 1 1 1 1 0 +2 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +3 3 3 3 3 0 +4 4 4 4 4 0 +5 5 5 5 5 0 + +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +1 \N \N \N \N 1 +1 1 1 1 1 0 +2 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +3 3 3 3 3 0 +4 4 4 4 4 0 +5 5 5 5 5 0 + diff --git a/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy b/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy index b70bfc2986..7bfc06120a 100644 --- a/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy +++ b/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy @@ -60,5 +60,43 @@ suite('nereids_delete_mow_partial_update') { sql "set skip_delete_sign=true;" sql "set skip_storage_engine_merge=true;" sql "set skip_delete_bitmap=true;" - qt_sql "select uid,v1,__DORIS_DELETE_SIGN__ from ${tableName1} order by uid;" + qt_sql "select uid,v1,__DORIS_DELETE_SIGN__ from ${tableName1} order by uid,v1,__DORIS_DELETE_SIGN__;" + + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + def tableName3 = "test_partial_update_delete3" + sql "DROP TABLE IF EXISTS ${tableName3};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName3} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + sql "insert into ${tableName3} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + streamLoad { + table "${tableName3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k1' + set 'partial_colunms', 'true' + set 'merge_type', 'DELETE' + + file 'partial_update_delete.csv' + time 10000 + } + qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_sql "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName3} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy new file mode 100644 index 0000000000..4022bb6d98 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('test_partial_update_delete') { + sql 'set enable_nereids_planner=false' + sql "set experimental_enable_nereids_planner=false;" + sql 'set enable_nereids_dml=false' + + def tableName1 = "test_partial_update_delete1" + sql "DROP TABLE IF EXISTS ${tableName1};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + + def tableName2 = "test_partial_update_delete2" + sql "DROP TABLE IF EXISTS ${tableName2};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( + `k` BIGINT NULL + ) UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + + sql "insert into ${tableName1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + qt_sql "select * from ${tableName1} order by k1;" + sql "insert into ${tableName2} values(1),(2),(3);" + sql "delete from ${tableName1} A using ${tableName2} B where A.k1=B.k;" + qt_sql "select * from ${tableName1} order by k1;" + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + def tableName3 = "test_partial_update_delete3" + sql "DROP TABLE IF EXISTS ${tableName3};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName3} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + sql "insert into ${tableName3} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + streamLoad { + table "${tableName3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k1' + set 'partial_colunms', 'true' + set 'merge_type', 'DELETE' + + file 'partial_update_delete.csv' + time 10000 + } + qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_sql "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName3} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org