This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a723b05a4f [fix](Outfile) add two fields to `SELECT INTO OUTFILE` 
(#48144)
3a723b05a4f is described below

commit 3a723b05a4fb4794b5698b06f966887d760487bd
Author: Tiewei Fang <fangtie...@selectdb.com>
AuthorDate: Fri Feb 28 13:57:42 2025 +0800

    [fix](Outfile) add two fields to `SELECT INTO OUTFILE` (#48144)
    
    Problem Summary:
    
    To better monitor the performance of Outfile, we've added two new fields
    to the Outfile return results: WriteTime and WriteSpeed.
    WriteTime is the time each writer takes to write data, measured in
    seconds.
    WriteSpeed is the average data write speed for each writer, measured in
    KB/s.
---
 be/src/vec/sink/writer/vfile_result_writer.cpp     | 78 ++++++++++++++++------
 be/src/vec/sink/writer/vfile_result_writer.h       |  2 +
 .../org/apache/doris/analysis/OutFileClause.java   |  6 ++
 .../org/apache/doris/load/ExportTaskExecutor.java  |  2 +
 .../java/org/apache/doris/load/OutfileInfo.java    |  6 ++
 .../org/apache/doris/planner/ResultFileSink.java   | 45 +++++--------
 6 files changed, 88 insertions(+), 51 deletions(-)

diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 5161cf2928d..bb6a54c4693 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -17,6 +17,7 @@
 
 #include "vfile_result_writer.h"
 
+#include <fmt/format.h>
 #include <gen_cpp/Data_types.h>
 #include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/PaloInternalService_types.h>
@@ -39,6 +40,7 @@
 #include "io/hdfs_builder.h"
 #include "pipeline/exec/result_sink_operator.h"
 #include "runtime/buffer_control_block.h"
+#include "runtime/decimalv2_value.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
 #include "runtime/large_int_value.h"
@@ -60,6 +62,8 @@
 
 namespace doris::vectorized {
 
+static double nons_to_second = 1000000000.00;
+
 VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs,
                                      std::shared_ptr<pipeline::Dependency> dep,
                                      std::shared_ptr<pipeline::Dependency> 
fin_dep)
@@ -271,19 +275,32 @@ Status VFileResultWriter::_send_result() {
     _is_result_sent = true;
 
     // The final stat result include:
-    // FileNumber, TotalRows, FileSize and URL
-    // The type of these field should be consistent with types defined
-    // in OutFileClause.java of FE.
+    // | FileNumber      | Int     |
+    // | TotalRows       | Bigint  |
+    // | FileSize        | Bigint  |
+    // | URL             | Varchar |
+    // | WriteTimeSec    | Varchar |
+    // | WriteSpeedKB    | Varchar |
+    // The type of these field should be consistent with types defined in 
OutFileClause.java of FE.
     MysqlRowBuffer<> row_buffer;
-    row_buffer.push_int(_file_idx);                         // file number
-    row_buffer.push_bigint(_written_rows_counter->value()); // total rows
-    row_buffer.push_bigint(_written_data_bytes->value());   // file size
+    row_buffer.push_int(_file_idx);                         // FileNumber
+    row_buffer.push_bigint(_written_rows_counter->value()); // TotalRows
+    row_buffer.push_bigint(_written_data_bytes->value());   // FileSize
     std::string file_url;
     _get_file_url(&file_url);
     std::stringstream ss;
     ss << file_url << "*";
     file_url = ss.str();
-    row_buffer.push_string(file_url.c_str(), file_url.length()); // url
+    row_buffer.push_string(file_url.c_str(), file_url.length()); // URL
+    double write_time = _file_write_timer->value() / nons_to_second;
+    std::string formatted_write_time = fmt::format("{:.3f}", write_time);
+    row_buffer.push_string(formatted_write_time.c_str(),
+                           formatted_write_time.length()); // WriteTimeSec
+
+    double write_speed = _get_write_speed(_written_data_bytes->value(), 
_file_write_timer->value());
+    std::string formatted_write_speed = fmt::format("{:.2f}", write_speed);
+    row_buffer.push_string(formatted_write_speed.c_str(),
+                           formatted_write_speed.length()); // WriteSpeedKB
 
     std::unique_ptr<TFetchDataResult> result = 
std::make_unique<TFetchDataResult>();
     result->result_batch.rows.resize(1);
@@ -295,6 +312,8 @@ Status VFileResultWriter::_send_result() {
             std::make_pair("TotalRows", 
std::to_string(_written_rows_counter->value())));
     attach_infos.insert(std::make_pair("FileSize", 
std::to_string(_written_data_bytes->value())));
     attach_infos.insert(std::make_pair("URL", file_url));
+    attach_infos.insert(std::make_pair("WriteTimeSec", formatted_write_time));
+    attach_infos.insert(std::make_pair("WriteSpeedKB", formatted_write_speed));
 
     result->result_batch.__set_attached_infos(attach_infos);
     RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result),
@@ -309,20 +328,29 @@ Status VFileResultWriter::_fill_result_block() {
     _is_result_sent = true;
 
 #ifndef INSERT_TO_COLUMN
-#define INSERT_TO_COLUMN                                                       
     \
-    if (i == 0) {                                                              
     \
-        column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0);     
     \
-    } else if (i == 1) {                                                       
     \
-        int64_t written_rows = _written_rows_counter->value();                 
     \
-        column->insert_data(reinterpret_cast<const char*>(&written_rows), 0);  
     \
-    } else if (i == 2) {                                                       
     \
-        int64_t written_data_bytes = _written_data_bytes->value();             
     \
-        column->insert_data(reinterpret_cast<const 
char*>(&written_data_bytes), 0); \
-    } else if (i == 3) {                                                       
     \
-        std::string file_url;                                                  
     \
-        static_cast<void>(_get_file_url(&file_url));                           
     \
-        column->insert_data(file_url.c_str(), file_url.size());                
     \
-    }                                                                          
     \
+#define INSERT_TO_COLUMN                                                       
             \
+    if (i == 0) {                                                              
             \
+        column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0);     
             \
+    } else if (i == 1) {                                                       
             \
+        int64_t written_rows = _written_rows_counter->value();                 
             \
+        column->insert_data(reinterpret_cast<const char*>(&written_rows), 0);  
             \
+    } else if (i == 2) {                                                       
             \
+        int64_t written_data_bytes = _written_data_bytes->value();             
             \
+        column->insert_data(reinterpret_cast<const 
char*>(&written_data_bytes), 0);         \
+    } else if (i == 3) {                                                       
             \
+        std::string file_url;                                                  
             \
+        static_cast<void>(_get_file_url(&file_url));                           
             \
+        column->insert_data(file_url.c_str(), file_url.size());                
             \
+    } else if (i == 4) {                                                       
             \
+        double write_time = _file_write_timer->value() / nons_to_second;       
             \
+        std::string formatted_write_time = fmt::format("{:.3f}", write_time);  
             \
+        column->insert_data(formatted_write_time.c_str(), 
formatted_write_time.size());     \
+    } else if (i == 5) {                                                       
             \
+        double write_speed =                                                   
             \
+                _get_write_speed(_written_data_bytes->value(), 
_file_write_timer->value()); \
+        std::string formatted_write_speed = fmt::format("{:.2f}", 
write_speed);             \
+        column->insert_data(formatted_write_speed.c_str(), 
formatted_write_speed.size());   \
+    }                                                                          
             \
     _output_block->replace_by_position(i, std::move(column));
 #endif
 
@@ -387,6 +415,14 @@ Status VFileResultWriter::_delete_dir() {
     }
 }
 
+double VFileResultWriter::_get_write_speed(int64_t write_bytes, int64_t 
write_time) {
+    if (write_time <= 0) {
+        return 0;
+    }
+    // KB / s
+    return ((write_bytes * nons_to_second) / (write_time)) / 1024;
+}
+
 Status VFileResultWriter::close(Status exec_status) {
     Status st = exec_status;
     if (st.ok()) {
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
index bf0a5d3e9e2..8b611d7ceef 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -21,6 +21,7 @@
 #include <stddef.h>
 #include <stdint.h>
 
+#include <cstdint>
 #include <iosfwd>
 #include <memory>
 #include <string>
@@ -99,6 +100,7 @@ private:
     Status _fill_result_block();
     // delete the dir of file_path
     Status _delete_dir();
+    double _get_write_speed(int64_t write_bytes, int64_t write_time);
 
     RuntimeState* _state; // not owned, set when init
     const pipeline::ResultFileOptions* _file_opts = nullptr;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index d4a7b25ed5a..adf601da1ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -79,17 +79,23 @@ public class OutFileClause {
     public static final String TOTAL_ROWS = "TotalRows";
     public static final String FILE_SIZE = "FileSize";
     public static final String URL = "URL";
+    public static final String WRITE_TIME_SEC = "WriteTimeSec";
+    public static final String WRITE_SPEED_KB = "WriteSpeedKB";
 
     static {
         RESULT_COL_NAMES.add(FILE_NUMBER);
         RESULT_COL_NAMES.add(TOTAL_ROWS);
         RESULT_COL_NAMES.add(FILE_SIZE);
         RESULT_COL_NAMES.add(URL);
+        RESULT_COL_NAMES.add(WRITE_TIME_SEC);
+        RESULT_COL_NAMES.add(WRITE_SPEED_KB);
 
         RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.INT));
         RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT));
         RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT));
         RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
+        RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
+        RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
 
         PARQUET_REPETITION_TYPE_MAP.put("required", 
TParquetRepetitionType.REQUIRED);
         PARQUET_REPETITION_TYPE_MAP.put("repeated", 
TParquetRepetitionType.REPEATED);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
index b81fb5404c1..3d321cfc9fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -215,6 +215,8 @@ public class ExportTaskExecutor implements 
TransientTaskExecutor {
             outfileInfoOneRow.setTotalRows(row.get(OutFileClause.TOTAL_ROWS));
             outfileInfoOneRow.setFileSize(row.get(OutFileClause.FILE_SIZE));
             outfileInfoOneRow.setUrl(row.get(OutFileClause.URL));
+            
outfileInfoOneRow.setWriteTime(row.get(OutFileClause.WRITE_TIME_SEC));
+            
outfileInfoOneRow.setWriteSpeed(row.get(OutFileClause.WRITE_SPEED_KB));
             outfileInfo.add(outfileInfoOneRow);
         }
         return outfileInfo;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
index b9befd9d326..262bf98adef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
@@ -34,4 +34,10 @@ public class OutfileInfo {
 
     @SerializedName("url")
     private String url;
+
+    @SerializedName("writeTime")
+    private String writeTime;
+
+    @SerializedName("writeSpeed")
+    private String writeSpeed;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index 631339c3732..3dbd5bc2115 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -24,8 +24,6 @@ import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
@@ -145,39 +143,26 @@ public class ResultFileSink extends DataSink {
 
     /**
      * Construct a tuple for file status, the tuple schema as following:
-     * | FileNumber | Int     |
-     * | TotalRows  | Bigint  |
-     * | FileSize   | Bigint  |
-     * | URL        | Varchar |
+     * | FileNumber    | Int     |
+     * | TotalRows     | Bigint  |
+     * | FileSize      | Bigint  |
+     * | URL           | Varchar |
+     * | WriteTimeSec  | Varchar |
+     * | WriteSpeedKB  | Varchar |
      */
     public static TupleDescriptor constructFileStatusTupleDesc(DescriptorTable 
descriptorTable) {
         TupleDescriptor resultFileStatusTupleDesc =
                 descriptorTable.createTupleDescriptor("result_file_status");
         resultFileStatusTupleDesc.setIsMaterialized(true);
-        SlotDescriptor fileNumber = 
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
-        fileNumber.setLabel("FileNumber");
-        fileNumber.setType(ScalarType.createType(PrimitiveType.INT));
-        fileNumber.setColumn(new Column("FileNumber", 
ScalarType.createType(PrimitiveType.INT)));
-        fileNumber.setIsMaterialized(true);
-        fileNumber.setIsNullable(false);
-        SlotDescriptor totalRows = 
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
-        totalRows.setLabel("TotalRows");
-        totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT));
-        totalRows.setColumn(new Column("TotalRows", 
ScalarType.createType(PrimitiveType.BIGINT)));
-        totalRows.setIsMaterialized(true);
-        totalRows.setIsNullable(false);
-        SlotDescriptor fileSize = 
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
-        fileSize.setLabel("FileSize");
-        fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT));
-        fileSize.setColumn(new Column("FileSize", 
ScalarType.createType(PrimitiveType.BIGINT)));
-        fileSize.setIsMaterialized(true);
-        fileSize.setIsNullable(false);
-        SlotDescriptor url = 
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
-        url.setLabel("URL");
-        url.setType(ScalarType.createType(PrimitiveType.VARCHAR));
-        url.setColumn(new Column("URL", 
ScalarType.createType(PrimitiveType.VARCHAR)));
-        url.setIsMaterialized(true);
-        url.setIsNullable(false);
+        for (int i = 0; i < OutFileClause.RESULT_COL_NAMES.size(); ++i) {
+            SlotDescriptor slotDescriptor = 
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
+            slotDescriptor.setLabel(OutFileClause.RESULT_COL_NAMES.get(i));
+            slotDescriptor.setType(OutFileClause.RESULT_COL_TYPES.get(i));
+            slotDescriptor.setColumn(new 
Column(OutFileClause.RESULT_COL_NAMES.get(i),
+                    OutFileClause.RESULT_COL_TYPES.get(i)));
+            slotDescriptor.setIsMaterialized(true);
+            slotDescriptor.setIsNullable(false);
+        }
         resultFileStatusTupleDesc.computeStatAndMemLayout();
         return resultFileStatusTupleDesc;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to