This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c74ca15753 [pipeline](sink) Supprt Async Writer Sink of result file 
sink and memory scratch sink (#23589)
c74ca15753 is described below

commit c74ca157530a8b10bf24324f957df67e273599a4
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Thu Aug 31 22:44:25 2023 +0800

    [pipeline](sink) Supprt Async Writer Sink of result file sink and memory 
scratch sink (#23589)
---
 be/src/exec/data_sink.cpp                          | 22 +++----
 be/src/exec/data_sink.h                            |  4 +-
 be/src/runtime/record_batch_queue.h                |  2 +
 be/src/runtime/result_writer.h                     | 11 +---
 .../runtime/vfile_writer_wrapper.h}                | 35 ++++++++--
 be/src/vec/runtime/vparquet_writer.h               | 30 +--------
 .../sink/{vtable_sink.h => async_writer_sink.h}    | 34 ++++------
 be/src/vec/sink/multi_cast_data_stream_sink.h      |  6 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  2 -
 be/src/vec/sink/vdata_stream_sender.h              |  2 -
 be/src/vec/sink/vmemory_scratch_sink.cpp           |  4 ++
 be/src/vec/sink/vmemory_scratch_sink.h             |  2 +-
 be/src/vec/sink/vresult_file_sink.cpp              | 77 ++++++----------------
 be/src/vec/sink/vresult_file_sink.h                | 25 +++----
 be/src/vec/sink/vresult_sink.cpp                   |  1 +
 be/src/vec/sink/vresult_sink.h                     |  4 +-
 be/src/vec/sink/vtablet_sink.h                     |  5 --
 be/src/vec/sink/vtablet_sink_v2.h                  |  5 --
 be/src/vec/sink/writer/async_result_writer.h       |  4 +-
 .../writer}/vfile_result_writer.cpp                | 51 +++++++-------
 .../{runtime => sink/writer}/vfile_result_writer.h | 34 +++++++---
 be/src/vec/sink/writer/vjdbc_table_writer.h        |  2 +
 be/src/vec/sink/writer/vmysql_table_writer.cpp     |  4 +-
 be/src/vec/sink/writer/vmysql_table_writer.h       |  4 +-
 be/src/vec/sink/writer/vodbc_table_writer.h        |  3 +-
 25 files changed, 159 insertions(+), 214 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 53f44dd590..5c40475eeb 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -30,12 +30,12 @@
 #include <string>
 
 #include "common/config.h"
+#include "vec/sink/async_writer_sink.h"
 #include "vec/sink/multi_cast_data_stream_sink.h"
 #include "vec/sink/vdata_stream_sender.h"
 #include "vec/sink/vmemory_scratch_sink.h"
 #include "vec/sink/vresult_file_sink.h"
 #include "vec/sink/vresult_sink.h"
-#include "vec/sink/vtable_sink.h"
 #include "vec/sink/vtablet_sink.h"
 #include "vec/sink/vtablet_sink_v2.h"
 
@@ -92,9 +92,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
                     params.destinations, 
send_query_statistics_with_every_batch, output_exprs,
                     desc_tbl));
         } else {
-            sink->reset(new doris::vectorized::VResultFileSink(
-                    state, pool, row_desc, thrift_sink.result_file_sink,
-                    send_query_statistics_with_every_batch, output_exprs));
+            sink->reset(new doris::vectorized::VResultFileSink(row_desc, 
output_exprs));
         }
         break;
     }
@@ -112,7 +110,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
             return Status::InternalError("Missing data buffer sink.");
         }
         vectorized::VMysqlTableSink* vmysql_tbl_sink =
-                new vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
+                new vectorized::VMysqlTableSink(row_desc, output_exprs);
         sink->reset(vmysql_tbl_sink);
         break;
 #else
@@ -124,7 +122,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         if (!thrift_sink.__isset.odbc_table_sink) {
             return Status::InternalError("Missing data odbc sink.");
         }
-        sink->reset(new vectorized::VOdbcTableSink(pool, row_desc, 
output_exprs));
+        sink->reset(new vectorized::VOdbcTableSink(row_desc, output_exprs));
         break;
     }
 
@@ -133,7 +131,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
             return Status::InternalError("Missing data jdbc sink.");
         }
         if (config::enable_java_support) {
-            sink->reset(new vectorized::VJdbcTableSink(pool, row_desc, 
output_exprs));
+            sink->reset(new vectorized::VJdbcTableSink(row_desc, 
output_exprs));
         } else {
             return Status::InternalError(
                     "Jdbc table sink is not enabled, you can change be config "
@@ -234,9 +232,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
                     params.destinations, 
send_query_statistics_with_every_batch, output_exprs,
                     desc_tbl));
         } else {
-            sink->reset(new doris::vectorized::VResultFileSink(
-                    state, pool, row_desc, thrift_sink.result_file_sink,
-                    send_query_statistics_with_every_batch, output_exprs));
+            sink->reset(new doris::vectorized::VResultFileSink(row_desc, 
output_exprs));
         }
         break;
     }
@@ -254,7 +250,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
             return Status::InternalError("Missing data buffer sink.");
         }
         vectorized::VMysqlTableSink* vmysql_tbl_sink =
-                new vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
+                new vectorized::VMysqlTableSink(row_desc, output_exprs);
         sink->reset(vmysql_tbl_sink);
         break;
 #else
@@ -266,7 +262,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         if (!thrift_sink.__isset.odbc_table_sink) {
             return Status::InternalError("Missing data odbc sink.");
         }
-        sink->reset(new vectorized::VOdbcTableSink(pool, row_desc, 
output_exprs));
+        sink->reset(new vectorized::VOdbcTableSink(row_desc, output_exprs));
         break;
     }
 
@@ -275,7 +271,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
             return Status::InternalError("Missing data jdbc sink.");
         }
         if (config::enable_java_support) {
-            sink->reset(new vectorized::VJdbcTableSink(pool, row_desc, 
output_exprs));
+            sink->reset(new vectorized::VJdbcTableSink(row_desc, 
output_exprs));
         } else {
             return Status::InternalError(
                     "Jdbc table sink is not enabled, you can change be config "
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 8200ff5d4c..778ee2da83 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -104,7 +104,7 @@ public:
                                    DescriptorTbl& desc_tbl);
 
     // Returns the runtime profile for the sink.
-    virtual RuntimeProfile* profile() = 0;
+    RuntimeProfile* profile() { return _profile; }
 
     virtual void set_query_statistics(std::shared_ptr<QueryStatistics> 
statistics) {
         _query_statistics = statistics;
@@ -121,6 +121,8 @@ protected:
     std::string _name;
     const RowDescriptor& _row_desc;
 
+    RuntimeProfile* _profile = nullptr; // Allocated from _pool
+
     // Maybe this will be transferred to BufferControlBlock.
     std::shared_ptr<QueryStatistics> _query_statistics;
 
diff --git a/be/src/runtime/record_batch_queue.h 
b/be/src/runtime/record_batch_queue.h
index 7ababc9b61..7528b85f09 100644
--- a/be/src/runtime/record_batch_queue.h
+++ b/be/src/runtime/record_batch_queue.h
@@ -59,6 +59,8 @@ public:
     // Shut down the queue. Wakes up all threads waiting on blocking_get or 
blocking_put.
     void shutdown();
 
+    size_t size() { return _queue.get_size(); }
+
 private:
     BlockingQueue<std::shared_ptr<arrow::RecordBatch>> _queue;
     SpinLock _status_lock;
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index a1458f1a71..b6cdd10c3a 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -34,7 +34,6 @@ class RuntimeState;
 class ResultWriter {
 public:
     ResultWriter() = default;
-    ResultWriter(bool output_object_data) : 
_output_object_data(output_object_data) {}
     virtual ~ResultWriter() = default;
 
     virtual Status init(RuntimeState* state) = 0;
@@ -43,7 +42,7 @@ public:
 
     virtual int64_t get_written_rows() const { return _written_rows; }
 
-    virtual bool output_object_data() const { return _output_object_data; }
+    bool output_object_data() const { return _output_object_data; }
 
     virtual Status append_block(vectorized::Block& block) = 0;
 
@@ -53,17 +52,9 @@ public:
         _output_object_data = output_object_data;
     }
 
-    static const std::string NULL_IN_CSV;
-    virtual void set_header_info(const std::string& header_type, const 
std::string& header) {
-        _header_type = header_type;
-        _header = header;
-    }
-
 protected:
     int64_t _written_rows = 0; // number of rows written
     bool _output_object_data = false;
-    std::string _header_type;
-    std::string _header;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/result_writer.cpp 
b/be/src/vec/runtime/vfile_writer_wrapper.h
similarity index 50%
rename from be/src/runtime/result_writer.cpp
rename to be/src/vec/runtime/vfile_writer_wrapper.h
index b5537e486c..e418a14ffa 100644
--- a/be/src/runtime/result_writer.cpp
+++ b/be/src/vec/runtime/vfile_writer_wrapper.h
@@ -15,12 +15,37 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/result_writer.h"
+#pragma once
 
-namespace doris {
+#include <memory>
+#include <vector>
 
-const std::string ResultWriter::NULL_IN_CSV = "\\N";
+#include "common/status.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr_fwd.h"
 
-}
+namespace doris::vectorized {
 
-/* vim: set ts=4 sw=4 sts=4 tw=100 expandtab : */
+class VFileWriterWrapper {
+public:
+    VFileWriterWrapper(const VExprContextSPtrs& output_vexpr_ctxs, bool 
output_object_data)
+            : _output_vexpr_ctxs(output_vexpr_ctxs),
+              _cur_written_rows(0),
+              _output_object_data(output_object_data) {}
+
+    virtual ~VFileWriterWrapper() = default;
+
+    virtual Status prepare() = 0;
+
+    virtual Status write(const Block& block) = 0;
+
+    virtual Status close() = 0;
+
+    virtual int64_t written_len() = 0;
+
+protected:
+    const VExprContextSPtrs& _output_vexpr_ctxs;
+    int64_t _cur_written_rows;
+    bool _output_object_data;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vparquet_writer.h 
b/be/src/vec/runtime/vparquet_writer.h
index 22410b5d06..36514c9fe8 100644
--- a/be/src/vec/runtime/vparquet_writer.h
+++ b/be/src/vec/runtime/vparquet_writer.h
@@ -26,12 +26,7 @@
 #include <parquet/types.h>
 #include <stdint.h>
 
-#include <memory>
-#include <vector>
-
-#include "common/status.h"
-#include "vec/core/block.h"
-#include "vec/exprs/vexpr_fwd.h"
+#include "vfile_writer_wrapper.h"
 
 namespace doris {
 namespace io {
@@ -90,29 +85,6 @@ public:
             const TypeDescriptor& type_desc);
 };
 
-class VFileWriterWrapper {
-public:
-    VFileWriterWrapper(const VExprContextSPtrs& output_vexpr_ctxs, bool 
output_object_data)
-            : _output_vexpr_ctxs(output_vexpr_ctxs),
-              _cur_written_rows(0),
-              _output_object_data(output_object_data) {}
-
-    virtual ~VFileWriterWrapper() = default;
-
-    virtual Status prepare() = 0;
-
-    virtual Status write(const Block& block) = 0;
-
-    virtual Status close() = 0;
-
-    virtual int64_t written_len() = 0;
-
-protected:
-    const VExprContextSPtrs& _output_vexpr_ctxs;
-    int64_t _cur_written_rows;
-    bool _output_object_data;
-};
-
 // a wrapper of parquet output stream
 class VParquetWriterWrapper final : public VFileWriterWrapper {
 public:
diff --git a/be/src/vec/sink/vtable_sink.h b/be/src/vec/sink/async_writer_sink.h
similarity index 75%
rename from be/src/vec/sink/vtable_sink.h
rename to be/src/vec/sink/async_writer_sink.h
index fc41faa175..8eb177ce8e 100644
--- a/be/src/vec/sink/vtable_sink.h
+++ b/be/src/vec/sink/async_writer_sink.h
@@ -42,10 +42,10 @@ namespace vectorized {
 class Block;
 
 template <typename Writer, const char* Name>
-class VTableSink : public DataSink {
+class AsyncWriterSink : public DataSink {
 public:
-    VTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const 
std::vector<TExpr>& t_exprs)
-            : DataSink(row_desc), _pool(pool), _t_output_expr(t_exprs) {
+    AsyncWriterSink(const RowDescriptor& row_desc, const std::vector<TExpr>& 
t_exprs)
+            : DataSink(row_desc), _t_output_expr(t_exprs) {
         _name = Name;
     }
 
@@ -87,14 +87,12 @@ public:
         return _writer->sink(block, eos);
     }
 
-    RuntimeProfile* profile() override { return _profile; }
-
     bool can_write() override { return _writer->can_write(); }
 
     Status close(RuntimeState* state, Status exec_status) override {
         if (_writer->need_normal_close()) {
             if (exec_status.ok() && !state->is_cancelled()) {
-                RETURN_IF_ERROR(_writer->finish_trans());
+                RETURN_IF_ERROR(_writer->commit_trans());
             }
             RETURN_IF_ERROR(_writer->close());
         }
@@ -111,37 +109,31 @@ public:
     bool is_close_done() override { return !_writer->is_pending_finish(); }
 
 protected:
-    // owned by RuntimeState
-    ObjectPool* _pool;
     const std::vector<TExpr>& _t_output_expr;
     VExprContextSPtrs _output_vexpr_ctxs;
     std::unique_ptr<Writer> _writer;
-    RuntimeProfile* _profile;
 };
 
 inline constexpr char VJDBC_TABLE_SINK_NAME[] = "VJdbcTableSink";
 inline constexpr char VODBC_TABLE_SINK_NAME[] = "VOdbcTableSink";
 inline constexpr char VMYSQL_TABLE_SINK_NAME[] = "VMysqlTableSink";
 
-class VJdbcTableSink : public VTableSink<VJdbcTableWriter, 
VJDBC_TABLE_SINK_NAME> {
+class VJdbcTableSink : public AsyncWriterSink<VJdbcTableWriter, 
VJDBC_TABLE_SINK_NAME> {
 public:
-    VJdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
-                   const std::vector<TExpr>& t_exprs)
-            : VTableSink<VJdbcTableWriter, VJDBC_TABLE_SINK_NAME>(pool, 
row_desc, t_exprs) {};
+    VJdbcTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>& 
t_exprs)
+            : AsyncWriterSink<VJdbcTableWriter, 
VJDBC_TABLE_SINK_NAME>(row_desc, t_exprs) {};
 };
 
-class VOdbcTableSink : public VTableSink<VOdbcTableWriter, 
VODBC_TABLE_SINK_NAME> {
+class VOdbcTableSink : public AsyncWriterSink<VOdbcTableWriter, 
VODBC_TABLE_SINK_NAME> {
 public:
-    VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
-                   const std::vector<TExpr>& t_exprs)
-            : VTableSink<VOdbcTableWriter, VODBC_TABLE_SINK_NAME>(pool, 
row_desc, t_exprs) {};
+    VOdbcTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>& 
t_exprs)
+            : AsyncWriterSink<VOdbcTableWriter, 
VODBC_TABLE_SINK_NAME>(row_desc, t_exprs) {};
 };
 
-class VMysqlTableSink : public VTableSink<VMysqlTableWriter, 
VMYSQL_TABLE_SINK_NAME> {
+class VMysqlTableSink : public AsyncWriterSink<VMysqlTableWriter, 
VMYSQL_TABLE_SINK_NAME> {
 public:
-    VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
-                    const std::vector<TExpr>& t_exprs)
-            : VTableSink<VMysqlTableWriter, VMYSQL_TABLE_SINK_NAME>(pool, 
row_desc, t_exprs) {};
+    VMysqlTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>& 
t_exprs)
+            : AsyncWriterSink<VMysqlTableWriter, 
VMYSQL_TABLE_SINK_NAME>(row_desc, t_exprs) {};
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h 
b/be/src/vec/sink/multi_cast_data_stream_sink.h
index df210d74ff..364586cad0 100644
--- a/be/src/vec/sink/multi_cast_data_stream_sink.h
+++ b/be/src/vec/sink/multi_cast_data_stream_sink.h
@@ -25,7 +25,9 @@ namespace doris::vectorized {
 class MultiCastDataStreamSink : public DataSink {
 public:
     MultiCastDataStreamSink(std::shared_ptr<pipeline::MultiCastDataStreamer>& 
streamer)
-            : DataSink(streamer->row_desc()), 
_multi_cast_data_streamer(streamer) {};
+            : DataSink(streamer->row_desc()), 
_multi_cast_data_streamer(streamer) {
+        _profile = _multi_cast_data_streamer->profile();
+    };
 
     ~MultiCastDataStreamSink() override = default;
 
@@ -39,8 +41,6 @@ public:
     // use sink to check can_write, now always true after we support spill to 
disk
     bool can_write() override { return _multi_cast_data_streamer->can_write(); 
}
 
-    RuntimeProfile* profile() override { return 
_multi_cast_data_streamer->profile(); }
-
     std::shared_ptr<pipeline::MultiCastDataStreamer>& 
get_multi_cast_data_streamer() {
         return _multi_cast_data_streamer;
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 4c547c7840..23194db6d9 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -300,7 +300,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
           _pool(pool),
           _current_channel_idx(0),
           _part_type(sink.output_partition.type),
-          _profile(nullptr),
           _serialize_batch_timer(nullptr),
           _bytes_sent_counter(nullptr),
           _local_send_timer(nullptr),
@@ -365,7 +364,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
           _pool(pool),
           _current_channel_idx(0),
           _part_type(TPartitionType::UNPARTITIONED),
-          _profile(nullptr),
           _serialize_batch_timer(nullptr),
           _compress_timer(nullptr),
           _brpc_send_timer(nullptr),
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index a7cceb1385..a3e4ccf2a6 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -117,7 +117,6 @@ public:
     Status send(RuntimeState* state, Block* block, bool eos = false) override;
     Status try_close(RuntimeState* state, Status exec_status) override;
     Status close(RuntimeState* state, Status exec_status) override;
-    RuntimeProfile* profile() override { return _profile; }
 
     RuntimeState* state() { return _state; }
 
@@ -192,7 +191,6 @@ protected:
     std::vector<Channel<VDataStreamSender>*> _channels;
     std::vector<std::shared_ptr<Channel<VDataStreamSender>>> 
_channel_shared_ptrs;
 
-    RuntimeProfile* _profile; // Allocated from _pool
     RuntimeProfile::Counter* _serialize_batch_timer;
     RuntimeProfile::Counter* _compress_timer;
     RuntimeProfile::Counter* _brpc_send_timer;
diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp 
b/be/src/vec/sink/vmemory_scratch_sink.cpp
index 6e9f5ca743..9b65838a3a 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.cpp
+++ b/be/src/vec/sink/vmemory_scratch_sink.cpp
@@ -90,6 +90,10 @@ Status MemoryScratchSink::open(RuntimeState* state) {
     return VExpr::open(_output_vexpr_ctxs, state);
 }
 
+bool MemoryScratchSink::can_write() {
+    return _queue->size() < 10;
+}
+
 Status MemoryScratchSink::close(RuntimeState* state, Status exec_status) {
     if (_closed) {
         return Status::OK();
diff --git a/be/src/vec/sink/vmemory_scratch_sink.h 
b/be/src/vec/sink/vmemory_scratch_sink.h
index c6306c481a..848952b1d7 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.h
+++ b/be/src/vec/sink/vmemory_scratch_sink.h
@@ -60,7 +60,7 @@ public:
 
     Status close(RuntimeState* state, Status exec_status) override;
 
-    RuntimeProfile* profile() override { return _profile; }
+    bool can_write() override;
 
 private:
     Status _prepare_vexpr(RuntimeState* state);
diff --git a/be/src/vec/sink/vresult_file_sink.cpp 
b/be/src/vec/sink/vresult_file_sink.cpp
index e30585419b..66d6414081 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -18,7 +18,6 @@
 #include "vec/sink/vresult_file_sink.h"
 
 #include <gen_cpp/DataSinks_types.h>
-#include <gen_cpp/PaloInternalService_types.h>
 #include <glog/logging.h>
 #include <opentelemetry/nostd/shared_ptr.h>
 #include <time.h>
@@ -29,14 +28,10 @@
 #include "common/config.h"
 #include "common/object_pool.h"
 #include "runtime/buffer_control_block.h"
-#include "runtime/exec_env.h"
 #include "runtime/result_buffer_mgr.h"
 #include "runtime/runtime_state.h"
-#include "util/runtime_profile.h"
 #include "util/telemetry/telemetry.h"
-#include "util/uid_util.h"
 #include "vec/exprs/vexpr.h"
-#include "vec/runtime/vfile_result_writer.h"
 
 namespace doris {
 class QueryStatistics;
@@ -45,71 +40,45 @@ class TExpr;
 
 namespace doris::vectorized {
 
-VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool,
-                                 const RowDescriptor& row_desc, const 
TResultFileSink& sink,
-                                 bool send_query_statistics_with_every_batch,
+VResultFileSink::VResultFileSink(const RowDescriptor& row_desc,
                                  const std::vector<TExpr>& t_output_expr)
-        : DataSink(row_desc), _t_output_expr(t_output_expr) {
-    CHECK(sink.__isset.file_options);
-    _file_opts.reset(new ResultFileOptions(sink.file_options));
-    CHECK(sink.__isset.storage_backend_type);
-    _storage_type = sink.storage_backend_type;
-    _is_top_sink = true;
-
-    _name = "VResultFileSink";
-    //for impl csv_with_name and csv_with_names_and_types
-    _header_type = sink.header_type;
-    _header = sink.header;
-}
+        : AsyncWriterSink<VFileResultWriter, VRESULT_FILE_SINK>(row_desc, 
t_output_expr) {}
 
 VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int 
sender_id,
                                  const RowDescriptor& row_desc, const 
TResultFileSink& sink,
                                  const std::vector<TPlanFragmentDestination>& 
destinations,
                                  bool send_query_statistics_with_every_batch,
                                  const std::vector<TExpr>& t_output_expr, 
DescriptorTbl& descs)
-        : DataSink(row_desc),
-          _t_output_expr(t_output_expr),
+        : AsyncWriterSink<VFileResultWriter, VRESULT_FILE_SINK>(row_desc, 
t_output_expr),
           
_output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) 
{
-    CHECK(sink.__isset.file_options);
-    _file_opts.reset(new ResultFileOptions(sink.file_options));
-    CHECK(sink.__isset.storage_backend_type);
-    _storage_type = sink.storage_backend_type;
     _is_top_sink = false;
     CHECK_EQ(destinations.size(), 1);
     _stream_sender.reset(new VDataStreamSender(state, pool, sender_id, 
row_desc, sink.dest_node_id,
                                                destinations,
                                                
send_query_statistics_with_every_batch));
-
-    _name = "VResultFileSink";
-    //for impl csv_with_name and csv_with_names_and_types
-    _header_type = sink.header_type;
-    _header = sink.header;
 }
 
 Status VResultFileSink::init(const TDataSink& tsink) {
     if (!_is_top_sink) {
         RETURN_IF_ERROR(_stream_sender->init(tsink));
     }
-    return Status::OK();
-}
 
-Status VResultFileSink::prepare_exprs(RuntimeState* state) {
-    // From the thrift expressions create the real exprs.
-    RETURN_IF_ERROR(VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
-    // Prepare the exprs to run.
-    RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
-    return Status::OK();
+    auto& sink = tsink.result_file_sink;
+    CHECK(sink.__isset.file_options);
+    _file_opts.reset(new ResultFileOptions(sink.file_options));
+    CHECK(sink.__isset.storage_backend_type);
+    _storage_type = sink.storage_backend_type;
+
+    _name = "VResultFileSink";
+    //for impl csv_with_name and csv_with_names_and_types
+    _header_type = sink.header_type;
+    _header = sink.header;
+
+    return VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs);
 }
 
 Status VResultFileSink::prepare(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSink::prepare(state));
-    std::stringstream title;
-    title << "VResultFileSink (fragment_instance_id=" << 
print_id(state->fragment_instance_id())
-          << ")";
-    // create profile
-    _profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
-    // prepare output_expr
-    RETURN_IF_ERROR(prepare_exprs(state));
+    RETURN_IF_ERROR(AsyncWriterSink::prepare(state));
 
     CHECK(_file_opts.get() != nullptr);
     if (_is_top_sink) {
@@ -120,7 +89,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
         // create writer
         _writer.reset(new (std::nothrow) VFileResultWriter(
                 _file_opts.get(), _storage_type, 
state->fragment_instance_id(), _output_vexpr_ctxs,
-                _profile, _sender.get(), nullptr, 
state->return_object_data_as_binary(),
+                _sender.get(), nullptr, state->return_object_data_as_binary(),
                 _output_row_descriptor));
     } else {
         // init channel
@@ -128,13 +97,12 @@ Status VResultFileSink::prepare(RuntimeState* state) {
                 
Block::create_unique(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1);
         _writer.reset(new (std::nothrow) VFileResultWriter(
                 _file_opts.get(), _storage_type, 
state->fragment_instance_id(), _output_vexpr_ctxs,
-                _profile, nullptr, _output_block.get(), 
state->return_object_data_as_binary(),
+                nullptr, _output_block.get(), 
state->return_object_data_as_binary(),
                 _output_row_descriptor));
         RETURN_IF_ERROR(_stream_sender->prepare(state));
         _profile->add_child(_stream_sender->profile(), true, nullptr);
     }
     _writer->set_header_info(_header_type, _header);
-    RETURN_IF_ERROR(_writer->init(state));
     return Status::OK();
 }
 
@@ -142,12 +110,7 @@ Status VResultFileSink::open(RuntimeState* state) {
     if (!_is_top_sink) {
         RETURN_IF_ERROR(_stream_sender->open(state));
     }
-    return VExpr::open(_output_vexpr_ctxs, state);
-}
-
-Status VResultFileSink::send(RuntimeState* state, Block* block, bool eos) {
-    RETURN_IF_ERROR(_writer->append_block(*block));
-    return Status::OK();
+    return AsyncWriterSink::open(state);
 }
 
 Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
@@ -157,7 +120,7 @@ Status VResultFileSink::close(RuntimeState* state, Status 
exec_status) {
 
     Status final_status = exec_status;
     // close the writer
-    if (_writer) {
+    if (_writer && _writer->need_normal_close()) {
         Status st = _writer->close();
         if (!st.ok() && exec_status.ok()) {
             // close file writer failed, should return this error to client
diff --git a/be/src/vec/sink/vresult_file_sink.h 
b/be/src/vec/sink/vresult_file_sink.h
index 848a6c371d..fdf4843a52 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -24,11 +24,12 @@
 #include <vector>
 
 #include "common/status.h"
-#include "exec/data_sink.h"
 #include "runtime/descriptors.h"
 #include "vec/core/block.h"
+#include "vec/sink/async_writer_sink.h"
 #include "vec/sink/vdata_stream_sender.h"
 #include "vec/sink/vresult_sink.h"
+#include "vec/sink/writer/vfile_result_writer.h"
 
 namespace doris {
 class BufferControlBlock;
@@ -44,51 +45,43 @@ class TResultFileSink;
 namespace vectorized {
 class VExprContext;
 
-class VResultFileSink : public DataSink {
+inline constexpr char VRESULT_FILE_SINK[] = "VResultFileSink";
+
+class VResultFileSink : public AsyncWriterSink<VFileResultWriter, 
VRESULT_FILE_SINK> {
 public:
-    VResultFileSink(RuntimeState* state, ObjectPool* pool, const 
RowDescriptor& row_desc,
-                    const TResultFileSink& sink, bool 
send_query_statistics_with_every_batch,
-                    const std::vector<TExpr>& t_output_expr);
+    VResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>& 
t_output_expr);
+
     VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id,
                     const RowDescriptor& row_desc, const TResultFileSink& sink,
                     const std::vector<TPlanFragmentDestination>& destinations,
                     bool send_query_statistics_with_every_batch,
                     const std::vector<TExpr>& t_output_expr, DescriptorTbl& 
descs);
-    ~VResultFileSink() override = default;
+
     Status init(const TDataSink& thrift_sink) override;
     Status prepare(RuntimeState* state) override;
     Status open(RuntimeState* state) override;
-    // send data in 'batch' to this backend stream mgr
-    // Blocks until all rows in batch are placed in the buffer
-    Status send(RuntimeState* state, Block* block, bool eos = false) override;
+
     // Flush all buffered data and close all existing channels to destination
     // hosts. Further send() calls are illegal after calling close().
     Status close(RuntimeState* state, Status exec_status) override;
-    RuntimeProfile* profile() override { return _profile; }
 
     void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override;
 
 private:
-    Status prepare_exprs(RuntimeState* state);
     // set file options when sink type is FILE
     std::unique_ptr<ResultFileOptions> _file_opts;
     TStorageBackendType::type _storage_type;
 
     // Owned by the RuntimeState.
-    const std::vector<TExpr>& _t_output_expr;
-    VExprContextSPtrs _output_vexpr_ctxs;
     RowDescriptor _output_row_descriptor;
 
     std::unique_ptr<Block> _output_block = nullptr;
     std::shared_ptr<BufferControlBlock> _sender;
     std::unique_ptr<VDataStreamSender> _stream_sender;
-    std::shared_ptr<ResultWriter> _writer;
     int _buf_size = 1024; // Allocated from _pool
     bool _is_top_sink = true;
     std::string _header;
     std::string _header_type;
-
-    RuntimeProfile* _profile;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index 2e82011bd7..5fdaaf1628 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -38,6 +38,7 @@
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 #include "vec/sink/vmysql_result_writer.h"
+#include "vec/sink/writer/vfile_result_writer.h"
 
 namespace doris {
 class QueryStatistics;
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index c9374d7cf9..a10e60d467 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -135,7 +135,6 @@ public:
     // Flush all buffered data and close all existing channels to destination
     // hosts. Further send() calls are illegal after calling close().
     virtual Status close(RuntimeState* state, Status exec_status) override;
-    virtual RuntimeProfile* profile() override { return _profile; }
 
     void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override;
 
@@ -152,8 +151,7 @@ private:
 
     std::shared_ptr<BufferControlBlock> _sender;
     std::shared_ptr<ResultWriter> _writer;
-    RuntimeProfile* _profile; // Allocated from _pool
-    int _buf_size;            // Allocated from _pool
+    int _buf_size; // Allocated from _pool
 
     // for fetch data by rowids
     TFetchOption _fetch_option;
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index e6d4992b48..dc408d392b 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -408,9 +408,6 @@ public:
 
     size_t get_pending_bytes() const;
 
-    // Returns the runtime profile for the sink.
-    RuntimeProfile* profile() override { return _profile; }
-
     // the consumer func of sending pending batches in every NodeChannel.
     // use polling & NodeChannel::try_send_and_fetch_status() to achieve 
nonblocking sending.
     // only focus on pending batches and channel status, the internal errors 
of NodeChannels will be handled by the producer
@@ -464,8 +461,6 @@ private:
     OlapTableLocationParam* _slave_location = nullptr;
     DorisNodesInfo* _nodes_info = nullptr;
 
-    RuntimeProfile* _profile = nullptr;
-
     std::unique_ptr<OlapTabletFinder> _tablet_finder;
 
     // index_channel
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index 5f50463268..047377f4e2 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -132,9 +132,6 @@ public:
     Status close(RuntimeState* state, Status close_status) override;
     Status send(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override;
 
-    // Returns the runtime profile for the sink.
-    RuntimeProfile* profile() override { return _profile; }
-
 private:
     Status _init_stream_pool(const NodeInfo& node_info, Streams& stream_pool,
                              LoadStreamStub& stub_template);
@@ -181,8 +178,6 @@ private:
     OlapTableLocationParam* _location = nullptr;
     DorisNodesInfo* _nodes_info = nullptr;
 
-    RuntimeProfile* _profile = nullptr;
-
     std::unique_ptr<OlapTabletFinder> _tablet_finder;
 
     std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index b294e110a9..2f70b05266 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -54,7 +54,9 @@ public:
 
     virtual bool in_transaction() { return false; }
 
-    bool need_normal_close() { return _need_normal_close; }
+    virtual Status commit_trans() { return Status::OK(); }
+
+    bool need_normal_close() const { return _need_normal_close; }
 
     Status init(RuntimeState* state) override { return Status::OK(); }
 
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
similarity index 94%
rename from be/src/vec/runtime/vfile_result_writer.cpp
rename to be/src/vec/sink/writer/vfile_result_writer.cpp
index 9d5fc4e158..e54c426f74 100644
--- a/be/src/vec/runtime/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/runtime/vfile_result_writer.h"
+#include "vfile_result_writer.h"
 
 #include <gen_cpp/Data_types.h>
 #include <gen_cpp/Metrics_types.h>
@@ -72,18 +72,20 @@ namespace doris::vectorized {
 const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
 using doris::operator<<;
 
+VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs)
+        : AsyncResultWriter(output_exprs) {}
+
 VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
                                      const TStorageBackendType::type 
storage_type,
                                      const TUniqueId fragment_instance_id,
                                      const VExprContextSPtrs& 
output_vexpr_ctxs,
-                                     RuntimeProfile* parent_profile, 
BufferControlBlock* sinker,
-                                     Block* output_block, bool 
output_object_data,
+                                     BufferControlBlock* sinker, Block* 
output_block,
+                                     bool output_object_data,
                                      const RowDescriptor& 
output_row_descriptor)
-        : _file_opts(file_opts),
+        : AsyncResultWriter(output_vexpr_ctxs),
+          _file_opts(file_opts),
           _storage_type(storage_type),
           _fragment_instance_id(fragment_instance_id),
-          _output_vexpr_ctxs(output_vexpr_ctxs),
-          _parent_profile(parent_profile),
           _sinker(sinker),
           _output_block(output_block),
           _output_row_descriptor(output_row_descriptor),
@@ -91,9 +93,9 @@ VFileResultWriter::VFileResultWriter(const ResultFileOptions* 
file_opts,
     _output_object_data = output_object_data;
 }
 
-Status VFileResultWriter::init(RuntimeState* state) {
+Status VFileResultWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
     _state = state;
-    _init_profile();
+    _init_profile(profile);
     // Delete existing files
     if (_file_opts->delete_existing_files) {
         RETURN_IF_ERROR(_delete_dir());
@@ -101,8 +103,8 @@ Status VFileResultWriter::init(RuntimeState* state) {
     return _create_next_file_writer();
 }
 
-void VFileResultWriter::_init_profile() {
-    RuntimeProfile* profile = 
_parent_profile->create_child("VFileResultWriter", true, true);
+void VFileResultWriter::_init_profile(RuntimeProfile* parent_profile) {
+    RuntimeProfile* profile = 
parent_profile->create_child("VFileResultWriter", true, true);
     _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
     _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", 
"AppendBatchTime");
     _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", 
"AppendBatchTime");
@@ -160,13 +162,13 @@ Status VFileResultWriter::_create_file_writer(const 
std::string& file_name) {
         break;
     case TFileFormatType::FORMAT_PARQUET:
         _vfile_writer.reset(new VParquetWriterWrapper(
-                _file_writer_impl.get(), _output_vexpr_ctxs, 
_file_opts->parquet_schemas,
+                _file_writer_impl.get(), _vec_output_expr_ctxs, 
_file_opts->parquet_schemas,
                 _file_opts->parquet_commpression_type, 
_file_opts->parquert_disable_dictionary,
                 _file_opts->parquet_version, _output_object_data));
         RETURN_IF_ERROR(_vfile_writer->prepare());
         break;
     case TFileFormatType::FORMAT_ORC:
-        _vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(), 
_output_vexpr_ctxs,
+        _vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(), 
_vec_output_expr_ctxs,
                                                   _file_opts->orc_schema, 
_output_object_data));
         RETURN_IF_ERROR(_vfile_writer->prepare());
         break;
@@ -237,12 +239,9 @@ Status VFileResultWriter::append_block(Block& block) {
     }
     RETURN_IF_ERROR(write_csv_header());
     SCOPED_TIMER(_append_row_batch_timer);
-    Status status = Status::OK();
-    // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
-    // failed, just return the error status
     Block output_block;
-    
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
 block,
-                                                                       
&output_block));
+    RETURN_IF_ERROR(_projection_block(block, &output_block));
+
     if (_vfile_writer) {
         RETURN_IF_ERROR(_write_file(output_block));
     } else {
@@ -267,7 +266,7 @@ Status VFileResultWriter::_write_csv_file(const Block& 
block) {
             if (col.column->is_null_at(i)) {
                 _plain_text_outstream << NULL_IN_CSV;
             } else {
-                switch (_output_vexpr_ctxs[col_id]->root()->type().type) {
+                switch (_vec_output_expr_ctxs[col_id]->root()->type().type) {
                 case TYPE_BOOLEAN:
                 case TYPE_TINYINT:
                     _plain_text_outstream << (int)*reinterpret_cast<const 
int8_t*>(
@@ -327,7 +326,7 @@ Status VFileResultWriter::_write_csv_file(const Block& 
block) {
                     const DateV2Value<DateTimeV2ValueType>* time_val =
                             (const 
DateV2Value<DateTimeV2ValueType>*)(col.column->get_data_at(i)
                                                                               
.data);
-                    time_val->to_string(buf, 
_output_vexpr_ctxs[col_id]->root()->type().scale);
+                    time_val->to_string(buf, 
_vec_output_expr_ctxs[col_id]->root()->type().scale);
                     _plain_text_outstream << buf;
                     break;
                 }
@@ -406,9 +405,9 @@ Status VFileResultWriter::_write_csv_file(const Block& 
block) {
 
 std::string VFileResultWriter::gen_types() {
     std::string types;
-    int num_columns = _output_vexpr_ctxs.size();
+    int num_columns = _vec_output_expr_ctxs.size();
     for (int i = 0; i < num_columns; ++i) {
-        types += type_to_string(_output_vexpr_ctxs[i]->root()->type().type);
+        types += type_to_string(_vec_output_expr_ctxs[i]->root()->type().type);
         if (i < num_columns - 1) {
             types += _file_opts->column_separator;
         }
@@ -419,7 +418,7 @@ std::string VFileResultWriter::gen_types() {
 
 Status VFileResultWriter::write_csv_header() {
     if (!_header_sent && _header.size() > 0) {
-        std::string tmp_header = _header;
+        std::string tmp_header(_header);
         if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
             tmp_header += gen_types();
         }
@@ -628,9 +627,13 @@ Status VFileResultWriter::close() {
     // because `_close_file_writer()` may be called in deconstructor,
     // at that time, the RuntimeState may already been deconstructed,
     // so does the profile in RuntimeState.
-    COUNTER_SET(_written_rows_counter, _written_rows);
-    SCOPED_TIMER(_writer_close_timer);
+    if (_written_rows_counter) {
+        COUNTER_SET(_written_rows_counter, _written_rows);
+        SCOPED_TIMER(_writer_close_timer);
+    }
     return _close_file_writer(true);
 }
 
+const string VFileResultWriter::NULL_IN_CSV = "\\N";
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
similarity index 85%
rename from be/src/vec/runtime/vfile_result_writer.h
rename to be/src/vec/sink/writer/vfile_result_writer.h
index 4ec425755d..b56e41c377 100644
--- a/be/src/vec/runtime/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -29,10 +29,10 @@
 #include "common/status.h"
 #include "io/fs/file_writer.h"
 #include "runtime/descriptors.h"
-#include "runtime/result_writer.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
-#include "vec/runtime/vparquet_writer.h"
+#include "vec/runtime/vfile_writer_wrapper.h"
+#include "vec/sink/writer/async_result_writer.h"
 
 namespace doris {
 class BufferControlBlock;
@@ -47,35 +47,45 @@ struct ResultFileOptions;
 namespace doris::vectorized {
 
 // write result to file
-class VFileResultWriter final : public ResultWriter {
+class VFileResultWriter final : public AsyncResultWriter {
 public:
     VFileResultWriter(const ResultFileOptions* file_option,
                       const TStorageBackendType::type storage_type,
                       const TUniqueId fragment_instance_id,
-                      const VExprContextSPtrs& _output_vexpr_ctxs, 
RuntimeProfile* parent_profile,
-                      BufferControlBlock* sinker, Block* output_block, bool 
output_object_data,
+                      const VExprContextSPtrs& _output_vexpr_ctxs, 
BufferControlBlock* sinker,
+                      Block* output_block, bool output_object_data,
                       const RowDescriptor& output_row_descriptor);
-    virtual ~VFileResultWriter() = default;
+
+    VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
     Status append_block(Block& block) override;
 
-    Status init(RuntimeState* state) override;
     Status close() override;
 
+    Status open(RuntimeState* state, RuntimeProfile* profile) override {
+        return _init(state, profile);
+    }
+
     // file result writer always return statistic result in one row
     int64_t get_written_rows() const override { return 1; }
 
     std::string gen_types();
     Status write_csv_header();
 
+    void set_header_info(const std::string& header_type, const std::string& 
header) {
+        _header_type = header_type;
+        _header = header;
+    }
+
 private:
+    Status _init(RuntimeState* state, RuntimeProfile*);
     Status _write_file(const Block& block);
     Status _write_csv_file(const Block& block);
 
     // if buffer exceed the limit, write the data buffered in 
_plain_text_outstream via file_writer
     // if eos, write the data even if buffer is not full.
     Status _flush_plain_text_outstream(bool eos);
-    void _init_profile();
+    void _init_profile(RuntimeProfile*);
 
     Status _create_file_writer(const std::string& file_name);
     Status _create_next_file_writer();
@@ -96,11 +106,12 @@ private:
     // delete the dir of file_path
     Status _delete_dir();
 
+    static const std::string NULL_IN_CSV;
+
     RuntimeState* _state; // not owned, set when init
     const ResultFileOptions* _file_opts;
     TStorageBackendType::type _storage_type;
     TUniqueId _fragment_instance_id;
-    const VExprContextSPtrs& _output_vexpr_ctxs;
 
     // If the result file format is plain text, like CSV, this _file_writer is 
owned by this FileResultWriter.
     // If the result file format is Parquet, this _file_writer is owned by 
_parquet_writer.
@@ -119,7 +130,6 @@ private:
     // the suffix idx of export file name, start at 0
     int _file_idx = 0;
 
-    RuntimeProfile* _parent_profile; // profile from result sink, not owned
     // total time cost on append batch operation
     RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
     // tuple convert timer, child timer of _append_row_batch_timer
@@ -142,5 +152,9 @@ private:
     RowDescriptor _output_row_descriptor;
     // parquet/orc file writer
     std::unique_ptr<VFileWriterWrapper> _vfile_writer;
+
+    std::string_view _header_type;
+    std::string_view _header;
+    std::unique_ptr<VFileResultWriter> _writer;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h 
b/be/src/vec/sink/writer/vjdbc_table_writer.h
index 1aa1e59482..205f0835f5 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -50,6 +50,8 @@ public:
 
     bool in_transaction() override { return 
TableConnector::_is_in_transaction; }
 
+    Status commit_trans() override { return JdbcConnector::finish_trans(); }
+
 private:
     JdbcConnectorParam _param;
 };
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp 
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index a9dad2ca80..6897a6b3e2 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -115,12 +115,12 @@ Status VMysqlTableWriter::append_block(vectorized::Block& 
block) {
     RETURN_IF_ERROR(_projection_block(block, &output_block));
     auto num_rows = output_block.rows();
     for (int i = 0; i < num_rows; ++i) {
-        RETURN_IF_ERROR(insert_row(output_block, i));
+        RETURN_IF_ERROR(_insert_row(output_block, i));
     }
     return Status::OK();
 }
 
-Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) {
+Status VMysqlTableWriter::_insert_row(vectorized::Block& block, size_t row) {
     _insert_stmt_buffer.clear();
     fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", 
_conn_info.table_name);
     int num_columns = _vec_output_expr_ctxs.size();
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h 
b/be/src/vec/sink/writer/vmysql_table_writer.h
index bb134f9a58..9028801144 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.h
+++ b/be/src/vec/sink/writer/vmysql_table_writer.h
@@ -53,12 +53,10 @@ public:
 
     Status append_block(vectorized::Block& block) override;
 
-    Status finish_trans() { return Status::OK(); }
-
     Status close() override;
 
 private:
-    Status insert_row(vectorized::Block& block, size_t row);
+    Status _insert_row(vectorized::Block& block, size_t row);
     MysqlConnInfo _conn_info;
     fmt::memory_buffer _insert_stmt_buffer;
     MYSQL* _mysql_conn;
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h 
b/be/src/vec/sink/writer/vodbc_table_writer.h
index e07f44c9e4..3df973e4b1 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -1,4 +1,3 @@
-
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -50,6 +49,8 @@ public:
     Status close() override { return ODBCConnector::close(); }
 
     bool in_transaction() override { return 
TableConnector::_is_in_transaction; }
+
+    Status commit_trans() override { return ODBCConnector::finish_trans(); }
 };
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to