This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new b84aefa013f [Bug](outfile) fix missing result when parallel outfile enabled (#43205) (#44184) b84aefa013f is described below commit b84aefa013f22272150f3980832d93334328ebf6 Author: Pxl <x...@selectdb.com> AuthorDate: Mon Nov 18 20:47:22 2024 +0800 [Bug](outfile) fix missing result when parallel outfile enabled (#43205) (#44184) pick from #43205 --- be/src/pipeline/exec/operator.h | 4 ++-- be/src/pipeline/exec/result_file_sink_operator.cpp | 1 + regression-test/suites/export_p0/test_outfile.groovy | 15 ++++++++++----- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index be54b7c4999..301f7599737 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -839,9 +839,9 @@ public: template <typename Writer, typename Parent> requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) -class AsyncWriterSink : public PipelineXSinkLocalState<FakeSharedState> { +class AsyncWriterSink : public PipelineXSinkLocalState<BasicSharedState> { public: - using Base = PipelineXSinkLocalState<FakeSharedState>; + using Base = PipelineXSinkLocalState<BasicSharedState>; AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state), _async_writer_dependency(nullptr) { _finish_dependency = diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 7c9c38ece5c..bc4e4c88d14 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -95,6 +95,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout(), state->batch_size())); } + _sender->set_dependency(state->fragment_instance_id(), _dependency->shared_from_this()); // create writer _writer.reset(new (std::nothrow) vectorized::VFileResultWriter( diff --git a/regression-test/suites/export_p0/test_outfile.groovy b/regression-test/suites/export_p0/test_outfile.groovy index 8b60803e185..56ede909514 100644 --- a/regression-test/suites/export_p0/test_outfile.groovy +++ b/regression-test/suites/export_p0/test_outfile.groovy @@ -201,17 +201,22 @@ suite("test_outfile") { `name` varchar(30) ) ENGINE=OLAP DUPLICATE KEY(`id`) - DISTRIBUTED BY HASH(`id`) BUCKETS 2 + DISTRIBUTED BY HASH(`id`) BUCKETS 16 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );""" sql """insert into select_into_file values(1, "b"),(2, "z"),(3, "a"), (4, "c"), (5, "睿"), (6, "多"), (7, "丝"), (8, "test"), - (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");""" + (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd"),(1, "b"),(2, "z"),(3, "a"), + (44, "c"), (55, "睿"), (66, "多"), (77, "丝"), (88, "test"), + (1000, "aa"), (1111, "bb"), (1234, "cc"), (2222, "dd");""" sql "set enable_parallel_outfile = true;" - sql """select * from select_into_file into outfile "file://${outFilePath}/";""" - - sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");""" + sql "set parallel_pipeline_task_num=4;" + def result = sql """select * from select_into_file into outfile "file://${outFilePath}/";""" + assertEquals(4, result.size()) + sql "set parallel_pipeline_task_num=8;" + result = sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");""" + assertEquals(8, result.size()) } finally { try_sql("DROP TABLE IF EXISTS select_into_file") File path = new File(outFilePath) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org