This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0be5331b282 [Fix](Variant) fix variant schema change may cause invalid 
block schema and write missing blocks #36317 (#36536)
0be5331b282 is described below

commit 0be5331b282e169b72d4221f21b724e235266ba7
Author: lihangyu <15605149...@163.com>
AuthorDate: Wed Jun 19 19:09:16 2024 +0800

    [Fix](Variant) fix variant schema change may cause invalid block schema and 
write missing blocks #36317 (#36536)
---
 be/src/olap/rowset/segment_creator.cpp                         | 10 +++++++---
 be/src/olap/rowset/segment_v2/hierarchical_data_reader.h       |  8 ++++++--
 be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp      |  3 ++-
 .../schema_change/test_double_write_when_schema_change.groovy  |  5 ++++-
 4 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/be/src/olap/rowset/segment_creator.cpp 
b/be/src/olap/rowset/segment_creator.cpp
index 126a6548be5..b968f684855 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -390,14 +390,15 @@ Status SegmentCreator::add_block(const vectorized::Block* 
block) {
     size_t block_row_num = block->rows();
     size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / 
block_row_num);
     size_t row_offset = 0;
-
     if (_segment_flusher.need_buffering()) {
+        RETURN_IF_ERROR(_buffer_block.merge(*block));
         if (_buffer_block.allocated_bytes() > config::write_buffer_size) {
+            LOG(INFO) << "directly flush a single block " << 
_buffer_block.rows() << " rows"
+                      << ", block size " << _buffer_block.bytes() << " block 
allocated_size "
+                      << _buffer_block.allocated_bytes();
             vectorized::Block block = _buffer_block.to_block();
             RETURN_IF_ERROR(flush_single_block(&block));
             _buffer_block.clear();
-        } else {
-            RETURN_IF_ERROR(_buffer_block.merge(*block));
         }
         return Status::OK();
     }
@@ -426,6 +427,9 @@ Status SegmentCreator::add_block(const vectorized::Block* 
block) {
 Status SegmentCreator::flush() {
     if (_buffer_block.rows() > 0) {
         vectorized::Block block = _buffer_block.to_block();
+        LOG(INFO) << "directly flush a single block " << block.rows() << " 
rows"
+                  << ", block size " << block.bytes() << " block 
allocated_size "
+                  << block.allocated_bytes();
         RETURN_IF_ERROR(flush_single_block(&block));
         _buffer_block.clear();
     }
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h 
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index a3ac277586c..67f78651416 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -163,8 +163,12 @@ private:
                 type->to_string(container_variant, i, write_buffer);
                 write_buffer.commit();
             }
-            CHECK(variant.empty());
-            
variant.create_root(std::make_shared<vectorized::DataTypeString>(), 
std::move(col_to));
+            if (variant.empty()) {
+                
variant.create_root(std::make_shared<vectorized::DataTypeString>(),
+                                    std::move(col_to));
+            } else {
+                variant.get_root()->insert_range_from(*col_to, 0, 
col_to->size());
+            }
         } else {
             // TODO select v:b -> v.b / v.b.c but v.d maybe in v
             // copy container variant to dst variant, todo avoid copy
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 26bf6f6ca2e..394f5bae184 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -718,8 +718,9 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
     } else if (block->columns() != _tablet_schema->num_columns()) {
         return Status::InternalError(
                 "illegal block columns, block columns = {}, tablet_schema 
columns = {}",
-                block->columns(), _tablet_schema->num_columns());
+                block->dump_structure(), _tablet_schema->dump_structure());
     }
+    LOG(INFO) << "add a single block " << block->rows();
     _batched_blocks.emplace_back(block, row_pos, num_rows);
     return Status::OK();
 }
diff --git 
a/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy
 
b/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy
index 91e94fcc40a..ecfd5ff98db 100644
--- 
a/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy
+++ 
b/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("double_write_schema_change_with_variant") {
+suite("double_write_schema_change_with_variant", "nonConcurrent") {
     def set_be_config = { key, value ->
         String backend_id;
         def backendId_to_backendIP = [:]
@@ -70,6 +70,7 @@ suite("double_write_schema_change_with_variant") {
     """
 
     set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", 
"6294967296")
+    set_be_config.call("write_buffer_size", "10240")
     load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
     load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-1.json'}""")
     load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-2.json'}""")
@@ -112,5 +113,7 @@ suite("double_write_schema_change_with_variant") {
 
     // createMV("create materialized view xxx as select k, sum(k) from 
${table_name} group by k order by k;")
     // qt_sql "select v['type'], v['id'], v['created_at'] from ${table_name} 
where cast(v['id'] as bigint) != 25061216922 order by k,  cast(v['id'] as 
bigint) limit 10"
+    // restore configs
     set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", 
"2147483648")
+    set_be_config.call("write_buffer_size", "209715200")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to