This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 0be5331b282 [Fix](Variant) fix variant schema change may cause invalid block schema and write missing blocks #36317 (#36536) 0be5331b282 is described below commit 0be5331b282e169b72d4221f21b724e235266ba7 Author: lihangyu <15605149...@163.com> AuthorDate: Wed Jun 19 19:09:16 2024 +0800 [Fix](Variant) fix variant schema change may cause invalid block schema and write missing blocks #36317 (#36536) --- be/src/olap/rowset/segment_creator.cpp | 10 +++++++--- be/src/olap/rowset/segment_v2/hierarchical_data_reader.h | 8 ++++++-- be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp | 3 ++- .../schema_change/test_double_write_when_schema_change.groovy | 5 ++++- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 126a6548be5..b968f684855 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -390,14 +390,15 @@ Status SegmentCreator::add_block(const vectorized::Block* block) { size_t block_row_num = block->rows(); size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num); size_t row_offset = 0; - if (_segment_flusher.need_buffering()) { + RETURN_IF_ERROR(_buffer_block.merge(*block)); if (_buffer_block.allocated_bytes() > config::write_buffer_size) { + LOG(INFO) << "directly flush a single block " << _buffer_block.rows() << " rows" + << ", block size " << _buffer_block.bytes() << " block allocated_size " + << _buffer_block.allocated_bytes(); vectorized::Block block = _buffer_block.to_block(); RETURN_IF_ERROR(flush_single_block(&block)); _buffer_block.clear(); - } else { - RETURN_IF_ERROR(_buffer_block.merge(*block)); } return Status::OK(); } @@ -426,6 +427,9 @@ Status SegmentCreator::add_block(const vectorized::Block* block) { Status SegmentCreator::flush() { if (_buffer_block.rows() > 0) { vectorized::Block block = _buffer_block.to_block(); + LOG(INFO) << "directly flush a single block " << block.rows() << " rows" + << ", block size " << block.bytes() << " block allocated_size " + << block.allocated_bytes(); RETURN_IF_ERROR(flush_single_block(&block)); _buffer_block.clear(); } diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index a3ac277586c..67f78651416 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -163,8 +163,12 @@ private: type->to_string(container_variant, i, write_buffer); write_buffer.commit(); } - CHECK(variant.empty()); - variant.create_root(std::make_shared<vectorized::DataTypeString>(), std::move(col_to)); + if (variant.empty()) { + variant.create_root(std::make_shared<vectorized::DataTypeString>(), + std::move(col_to)); + } else { + variant.get_root()->insert_range_from(*col_to, 0, col_to->size()); + } } else { // TODO select v:b -> v.b / v.b.c but v.d maybe in v // copy container variant to dst variant, todo avoid copy diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 26bf6f6ca2e..394f5bae184 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -718,8 +718,9 @@ Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, size_t } else if (block->columns() != _tablet_schema->num_columns()) { return Status::InternalError( "illegal block columns, block columns = {}, tablet_schema columns = {}", - block->columns(), _tablet_schema->num_columns()); + block->dump_structure(), _tablet_schema->dump_structure()); } + LOG(INFO) << "add a single block " << block->rows(); _batched_blocks.emplace_back(block, row_pos, num_rows); return Status::OK(); } diff --git a/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy b/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy index 91e94fcc40a..ecfd5ff98db 100644 --- a/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy +++ b/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("double_write_schema_change_with_variant") { +suite("double_write_schema_change_with_variant", "nonConcurrent") { def set_be_config = { key, value -> String backend_id; def backendId_to_backendIP = [:] @@ -70,6 +70,7 @@ suite("double_write_schema_change_with_variant") { """ set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "6294967296") + set_be_config.call("write_buffer_size", "10240") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-2.json'}""") @@ -112,5 +113,7 @@ suite("double_write_schema_change_with_variant") { // createMV("create materialized view xxx as select k, sum(k) from ${table_name} group by k order by k;") // qt_sql "select v['type'], v['id'], v['created_at'] from ${table_name} where cast(v['id'] as bigint) != 25061216922 order by k, cast(v['id'] as bigint) limit 10" + // restore configs set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "2147483648") + set_be_config.call("write_buffer_size", "209715200") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org