This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b84aefa013f [Bug](outfile) fix missing result when parallel outfile 
enabled (#43205) (#44184)
b84aefa013f is described below

commit b84aefa013f22272150f3980832d93334328ebf6
Author: Pxl <x...@selectdb.com>
AuthorDate: Mon Nov 18 20:47:22 2024 +0800

    [Bug](outfile) fix missing result when parallel outfile enabled (#43205) 
(#44184)
    
    pick from #43205
---
 be/src/pipeline/exec/operator.h                      |  4 ++--
 be/src/pipeline/exec/result_file_sink_operator.cpp   |  1 +
 regression-test/suites/export_p0/test_outfile.groovy | 15 ++++++++++-----
 3 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index be54b7c4999..301f7599737 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -839,9 +839,9 @@ public:
 
 template <typename Writer, typename Parent>
     requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
-class AsyncWriterSink : public PipelineXSinkLocalState<FakeSharedState> {
+class AsyncWriterSink : public PipelineXSinkLocalState<BasicSharedState> {
 public:
-    using Base = PipelineXSinkLocalState<FakeSharedState>;
+    using Base = PipelineXSinkLocalState<BasicSharedState>;
     AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
             : Base(parent, state), _async_writer_dependency(nullptr) {
         _finish_dependency =
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 7c9c38ece5c..bc4e4c88d14 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -95,6 +95,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
                 state->fragment_instance_id(), p._buf_size, &_sender, 
state->execution_timeout(),
                 state->batch_size()));
     }
+    _sender->set_dependency(state->fragment_instance_id(), 
_dependency->shared_from_this());
 
     // create writer
     _writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
diff --git a/regression-test/suites/export_p0/test_outfile.groovy 
b/regression-test/suites/export_p0/test_outfile.groovy
index 8b60803e185..56ede909514 100644
--- a/regression-test/suites/export_p0/test_outfile.groovy
+++ b/regression-test/suites/export_p0/test_outfile.groovy
@@ -201,17 +201,22 @@ suite("test_outfile") {
                      `name` varchar(30)
                  ) ENGINE=OLAP
                    DUPLICATE KEY(`id`)
-                   DISTRIBUTED BY HASH(`id`) BUCKETS 2
+                   DISTRIBUTED BY HASH(`id`) BUCKETS 16
                    PROPERTIES (
                    "replication_allocation" = "tag.location.default: 1"
                    );"""
         sql """insert into select_into_file values(1, "b"),(2, "z"),(3, "a"),
                     (4, "c"), (5, "睿"), (6, "多"), (7, "丝"), (8, "test"),
-                    (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");"""
+                    (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd"),(1, 
"b"),(2, "z"),(3, "a"),
+                    (44, "c"), (55, "睿"), (66, "多"), (77, "丝"), (88, "test"),
+                    (1000, "aa"), (1111, "bb"), (1234, "cc"), (2222, "dd");"""
         sql "set enable_parallel_outfile = true;"
-        sql """select * from select_into_file into outfile 
"file://${outFilePath}/";"""
-
-        sql """select * from select_into_file into outfile 
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
+        sql "set parallel_pipeline_task_num=4;"
+        def result = sql """select * from select_into_file into outfile 
"file://${outFilePath}/";"""
+        assertEquals(4, result.size())
+        sql "set parallel_pipeline_task_num=8;"
+        result = sql """select * from select_into_file into outfile 
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
+        assertEquals(8, result.size())
     } finally {
         try_sql("DROP TABLE IF EXISTS select_into_file")
         File path = new File(outFilePath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to