This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch cp_1118_2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2aa3931e8990811aec7d84df39e78b1db34a7ddc Author: Pxl <pxl...@qq.com> AuthorDate: Wed Nov 6 14:11:22 2024 +0800 [Bug](outfile) fix missing result when parallel outfile enabled (#43205) ### What problem does this PR solve? fix missing result when parallel outfile enabled Problem Summary: avoid lack of logic to use dep to control eos sending --- be/src/pipeline/exec/operator.h | 4 ++-- be/src/pipeline/exec/result_file_sink_operator.cpp | 1 + regression-test/suites/export_p0/test_outfile.groovy | 15 ++++++++++----- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index be54b7c4999..301f7599737 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -839,9 +839,9 @@ public: template <typename Writer, typename Parent> requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) -class AsyncWriterSink : public PipelineXSinkLocalState<FakeSharedState> { +class AsyncWriterSink : public PipelineXSinkLocalState<BasicSharedState> { public: - using Base = PipelineXSinkLocalState<FakeSharedState>; + using Base = PipelineXSinkLocalState<BasicSharedState>; AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state), _async_writer_dependency(nullptr) { _finish_dependency = diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 0c16ae115af..252984226f6 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -97,6 +97,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout(), state->batch_size())); } + _sender->set_dependency(state->fragment_instance_id(), _dependency->shared_from_this()); // create writer _writer.reset(new (std::nothrow) vectorized::VFileResultWriter( diff --git a/regression-test/suites/export_p0/test_outfile.groovy b/regression-test/suites/export_p0/test_outfile.groovy index 8b60803e185..56ede909514 100644 --- a/regression-test/suites/export_p0/test_outfile.groovy +++ b/regression-test/suites/export_p0/test_outfile.groovy @@ -201,17 +201,22 @@ suite("test_outfile") { `name` varchar(30) ) ENGINE=OLAP DUPLICATE KEY(`id`) - DISTRIBUTED BY HASH(`id`) BUCKETS 2 + DISTRIBUTED BY HASH(`id`) BUCKETS 16 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );""" sql """insert into select_into_file values(1, "b"),(2, "z"),(3, "a"), (4, "c"), (5, "睿"), (6, "多"), (7, "丝"), (8, "test"), - (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");""" + (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd"),(1, "b"),(2, "z"),(3, "a"), + (44, "c"), (55, "睿"), (66, "多"), (77, "丝"), (88, "test"), + (1000, "aa"), (1111, "bb"), (1234, "cc"), (2222, "dd");""" sql "set enable_parallel_outfile = true;" - sql """select * from select_into_file into outfile "file://${outFilePath}/";""" - - sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");""" + sql "set parallel_pipeline_task_num=4;" + def result = sql """select * from select_into_file into outfile "file://${outFilePath}/";""" + assertEquals(4, result.size()) + sql "set parallel_pipeline_task_num=8;" + result = sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");""" + assertEquals(8, result.size()) } finally { try_sql("DROP TABLE IF EXISTS select_into_file") File path = new File(outFilePath) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org