This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 34a1b7599f [Fix](lazy_open) fix lazy open commit info lose (#20404) 34a1b7599f is described below commit 34a1b7599f60ff9f0674bde8ba1da581b0443221 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Sun Jun 4 19:08:36 2023 +0800 [Fix](lazy_open) fix lazy open commit info lose (#20404) --- be/src/vec/sink/vtablet_sink.cpp | 23 ++++- be/src/vec/sink/vtablet_sink.h | 2 + .../test_materialized_view_lazy_open.groovy | 103 ++++++++++++++++++++- 3 files changed, 121 insertions(+), 7 deletions(-) diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 9533ad639e..a5711f4c12 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -516,7 +516,8 @@ Status VNodeChannel::open_wait() { } void VNodeChannel::open_partition(int64_t partition_id) { - _timeout_watch.reset(); + MonotonicStopWatch lazy_open_timeout_watch; + lazy_open_timeout_watch.start(); SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); OpenPartitionRequest request; auto load_id = std::make_shared<PUniqueId>(_parent->_load_id); @@ -533,7 +534,7 @@ void VNodeChannel::open_partition(int64_t partition_id) { auto open_partition_closure = std::make_unique<OpenPartitionClosure>(this, _index_channel, partition_id); - int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time(); + int remain_ms = _rpc_timeout_ms - lazy_open_timeout_watch.elapsed_time(); if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { remain_ms = config::min_load_rpc_timeout_ms; } @@ -546,6 +547,12 @@ void VNodeChannel::open_partition(int64_t partition_id) { request.release_id(); } +void VNodeChannel::open_partition_wait() { + for (auto& open_partition_closure : _open_partition_closures) { + open_partition_closure->join(); + } +} + Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, bool is_append) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); if (payload->second.empty()) { @@ -884,9 +891,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { Status VNodeChannel::close_wait(RuntimeState* state) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - for (auto& open_partition_closure : _open_partition_closures) { - open_partition_closure->join(); - } // set _is_closed to true finally Defer set_closed {[&]() { std::lock_guard<std::mutex> l(_closed_lock); @@ -1407,6 +1411,15 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { num_node_channels = 0; VNodeChannelStat channel_stat; { + if (config::enable_lazy_open_partition) { + for (auto index_channel : _channels) { + index_channel->for_each_node_channel( + [](const std::shared_ptr<VNodeChannel>& ch) { + ch->open_partition_wait(); + }); + } + } + for (auto index_channel : _channels) { index_channel->for_each_node_channel( [](const std::shared_ptr<VNodeChannel>& ch) { ch->mark_close(); }); diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index f9edeff693..1e91f4247f 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -230,6 +230,8 @@ public: Status open_wait(); + void open_partition_wait(); + Status add_block(vectorized::Block* block, const Payload* payload, bool is_append = false); int try_send_and_fetch_status(RuntimeState* state, diff --git a/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy b/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy index 064acdd9d4..599141d9c1 100644 --- a/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy +++ b/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy @@ -22,6 +22,8 @@ suite("test_materialized_view_lazy_open", "rollup") { def tbName1 = "test_materialized_view_lazy_open" def tbName2 = "test_materialized_view_lazy_open_dynamic_partition" + def tbName3 = "test_materialized_view_lazy_open_schema_change" + def tbName4 = "test_materialized_view_lazy_open_dynamic_partition_schema_change" def getJobState = { tableName -> def jobStateResult = sql """ SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName='${tableName}' ORDER BY CreateTime DESC LIMIT 1; """ @@ -44,7 +46,10 @@ suite("test_materialized_view_lazy_open", "rollup") { PARTITION p3 VALUES LESS THAN ("2020-01-01") ) DISTRIBUTED BY HASH(k1) BUCKETS 32 - properties("replication_num" = "1"); + properties( + "light_schema_change" = "false", + "replication_num" = "1" + ); """ sql "DROP TABLE IF EXISTS ${tbName2}" @@ -70,6 +75,58 @@ suite("test_materialized_view_lazy_open", "rollup") { "dynamic_partition.end" = "3", "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "32", + "light_schema_change" = "false", + "replication_num"="1" + ); + """ + + sql "DROP TABLE IF EXISTS ${tbName3}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbName3}( + k1 DATE, + k2 DECIMAL(10, 2), + k3 CHAR(10), + k4 INT NOT NULL + ) + DUPLICATE KEY(k1, k2) + PARTITION BY RANGE(k1) + ( + PARTITION p1 VALUES LESS THAN ("2000-01-01"), + PARTITION p2 VALUES LESS THAN ("2010-01-01"), + PARTITION p3 VALUES LESS THAN ("2020-01-01") + ) + DISTRIBUTED BY HASH(k1) BUCKETS 32 + + properties( + "light_schema_change" = "true", + "replication_num" = "1" + ); + """ + + sql "DROP TABLE IF EXISTS ${tbName4}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbName4}( + k1 DATE, + k2 DECIMAL(10, 2), + k3 CHAR(10), + k4 INT NOT NULL + ) + PARTITION BY RANGE(k1) + ( + PARTITION p1 VALUES LESS THAN ("2000-01-01"), + PARTITION p2 VALUES LESS THAN ("2010-01-01"), + PARTITION p3 VALUES LESS THAN ("2020-01-01") + ) + DISTRIBUTED BY HASH(k1) + PROPERTIES + ( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.start" = "-2147483648", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "32", + "light_schema_change" = "true", "replication_num"="1" ); """ @@ -106,12 +163,54 @@ suite("test_materialized_view_lazy_open", "rollup") { } } + sql "CREATE materialized VIEW test_lazy_open_schema_change AS SELECT k1 FROM ${tbName3} GROUP BY k1;" + max_try_secs = 60 + while (max_try_secs--) { + String res = getJobState(tbName3) + if (res == "FINISHED") { + sleep(3000) + break + } else { + Thread.sleep(2000) + if (max_try_secs < 1) { + println "test timeout," + "state:" + res + assertEquals("FINISHED",res) + } + } + } + + sql "CREATE materialized VIEW test_lazy_open_dynamic_partition_schema_change AS SELECT k1 FROM ${tbName4} GROUP BY k1;" + max_try_secs = 60 + while (max_try_secs--) { + String res = getJobState(tbName4) + if (res == "FINISHED") { + sleep(3000) + break + } else { + Thread.sleep(2000) + if (max_try_secs < 1) { + println "test timeout," + "state:" + res + assertEquals("FINISHED",res) + } + } + } + sql "insert into ${tbName1} values('2000-05-20', 1.5, 'test', 1);" sql "insert into ${tbName1} values('2010-05-20', 1.5, 'test', 1);" + sql "insert into ${tbName2} values('2000-05-20', 1.5, 'test', 1);" sql "insert into ${tbName2} values('2010-05-20', 1.5, 'test', 1);" + sql "insert into ${tbName3} values('2000-05-20', 1.5, 'test', 1);" + sql "ALTER table ${tbName3} ADD COLUMN new_column INT;" + sql "insert into ${tbName3} values('2010-05-20', 1.5, 'test', 1, 1);" + + sql "insert into ${tbName4} values('2000-05-20', 1.5, 'test', 1);" + sql "ALTER table ${tbName4} ADD COLUMN new_column INT;" + sql "insert into ${tbName4} values('2010-05-20', 1.5, 'test', 1, 1);" + sql "DROP TABLE ${tbName1} FORCE;" sql "DROP TABLE ${tbName2} FORCE;" - + sql "DROP TABLE ${tbName3} FORCE;" + sql "DROP TABLE ${tbName4} FORCE;" } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org