This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch cp_1118_2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2aa3931e8990811aec7d84df39e78b1db34a7ddc
Author: Pxl <pxl...@qq.com>
AuthorDate: Wed Nov 6 14:11:22 2024 +0800

    [Bug](outfile) fix missing result when parallel outfile enabled (#43205)
    
    ### What problem does this PR solve?
    fix missing result when parallel outfile enabled
    
    Problem Summary:
    avoid lack of logic to use dep to control eos sending
---
 be/src/pipeline/exec/operator.h                      |  4 ++--
 be/src/pipeline/exec/result_file_sink_operator.cpp   |  1 +
 regression-test/suites/export_p0/test_outfile.groovy | 15 ++++++++++-----
 3 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index be54b7c4999..301f7599737 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -839,9 +839,9 @@ public:
 
 template <typename Writer, typename Parent>
     requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
-class AsyncWriterSink : public PipelineXSinkLocalState<FakeSharedState> {
+class AsyncWriterSink : public PipelineXSinkLocalState<BasicSharedState> {
 public:
-    using Base = PipelineXSinkLocalState<FakeSharedState>;
+    using Base = PipelineXSinkLocalState<BasicSharedState>;
     AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
             : Base(parent, state), _async_writer_dependency(nullptr) {
         _finish_dependency =
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 0c16ae115af..252984226f6 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -97,6 +97,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
                 state->fragment_instance_id(), p._buf_size, &_sender, 
state->execution_timeout(),
                 state->batch_size()));
     }
+    _sender->set_dependency(state->fragment_instance_id(), 
_dependency->shared_from_this());
 
     // create writer
     _writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
diff --git a/regression-test/suites/export_p0/test_outfile.groovy 
b/regression-test/suites/export_p0/test_outfile.groovy
index 8b60803e185..56ede909514 100644
--- a/regression-test/suites/export_p0/test_outfile.groovy
+++ b/regression-test/suites/export_p0/test_outfile.groovy
@@ -201,17 +201,22 @@ suite("test_outfile") {
                      `name` varchar(30)
                  ) ENGINE=OLAP
                    DUPLICATE KEY(`id`)
-                   DISTRIBUTED BY HASH(`id`) BUCKETS 2
+                   DISTRIBUTED BY HASH(`id`) BUCKETS 16
                    PROPERTIES (
                    "replication_allocation" = "tag.location.default: 1"
                    );"""
         sql """insert into select_into_file values(1, "b"),(2, "z"),(3, "a"),
                     (4, "c"), (5, "睿"), (6, "多"), (7, "丝"), (8, "test"),
-                    (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");"""
+                    (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd"),(1, 
"b"),(2, "z"),(3, "a"),
+                    (44, "c"), (55, "睿"), (66, "多"), (77, "丝"), (88, "test"),
+                    (1000, "aa"), (1111, "bb"), (1234, "cc"), (2222, "dd");"""
         sql "set enable_parallel_outfile = true;"
-        sql """select * from select_into_file into outfile 
"file://${outFilePath}/";"""
-
-        sql """select * from select_into_file into outfile 
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
+        sql "set parallel_pipeline_task_num=4;"
+        def result = sql """select * from select_into_file into outfile 
"file://${outFilePath}/";"""
+        assertEquals(4, result.size())
+        sql "set parallel_pipeline_task_num=8;"
+        result = sql """select * from select_into_file into outfile 
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
+        assertEquals(8, result.size())
     } finally {
         try_sql("DROP TABLE IF EXISTS select_into_file")
         File path = new File(outFilePath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to