This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3a723b05a4f [fix](Outfile) add two fields to `SELECT INTO OUTFILE` (#48144) 3a723b05a4f is described below commit 3a723b05a4fb4794b5698b06f966887d760487bd Author: Tiewei Fang <fangtie...@selectdb.com> AuthorDate: Fri Feb 28 13:57:42 2025 +0800 [fix](Outfile) add two fields to `SELECT INTO OUTFILE` (#48144) Problem Summary: To better monitor the performance of Outfile, we've added two new fields to the Outfile return results: WriteTime and WriteSpeed. WriteTime is the time each writer takes to write data, measured in seconds. WriteSpeed is the average data write speed for each writer, measured in KB/s. --- be/src/vec/sink/writer/vfile_result_writer.cpp | 78 ++++++++++++++++------ be/src/vec/sink/writer/vfile_result_writer.h | 2 + .../org/apache/doris/analysis/OutFileClause.java | 6 ++ .../org/apache/doris/load/ExportTaskExecutor.java | 2 + .../java/org/apache/doris/load/OutfileInfo.java | 6 ++ .../org/apache/doris/planner/ResultFileSink.java | 45 +++++-------- 6 files changed, 88 insertions(+), 51 deletions(-) diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index 5161cf2928d..bb6a54c4693 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -17,6 +17,7 @@ #include "vfile_result_writer.h" +#include <fmt/format.h> #include <gen_cpp/Data_types.h> #include <gen_cpp/Metrics_types.h> #include <gen_cpp/PaloInternalService_types.h> @@ -39,6 +40,7 @@ #include "io/hdfs_builder.h" #include "pipeline/exec/result_sink_operator.h" #include "runtime/buffer_control_block.h" +#include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/large_int_value.h" @@ -60,6 +62,8 @@ namespace doris::vectorized { +static double nons_to_second = 1000000000.00; + VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, std::shared_ptr<pipeline::Dependency> dep, std::shared_ptr<pipeline::Dependency> fin_dep) @@ -271,19 +275,32 @@ Status VFileResultWriter::_send_result() { _is_result_sent = true; // The final stat result include: - // FileNumber, TotalRows, FileSize and URL - // The type of these field should be consistent with types defined - // in OutFileClause.java of FE. + // | FileNumber | Int | + // | TotalRows | Bigint | + // | FileSize | Bigint | + // | URL | Varchar | + // | WriteTimeSec | Varchar | + // | WriteSpeedKB | Varchar | + // The type of these field should be consistent with types defined in OutFileClause.java of FE. MysqlRowBuffer<> row_buffer; - row_buffer.push_int(_file_idx); // file number - row_buffer.push_bigint(_written_rows_counter->value()); // total rows - row_buffer.push_bigint(_written_data_bytes->value()); // file size + row_buffer.push_int(_file_idx); // FileNumber + row_buffer.push_bigint(_written_rows_counter->value()); // TotalRows + row_buffer.push_bigint(_written_data_bytes->value()); // FileSize std::string file_url; _get_file_url(&file_url); std::stringstream ss; ss << file_url << "*"; file_url = ss.str(); - row_buffer.push_string(file_url.c_str(), file_url.length()); // url + row_buffer.push_string(file_url.c_str(), file_url.length()); // URL + double write_time = _file_write_timer->value() / nons_to_second; + std::string formatted_write_time = fmt::format("{:.3f}", write_time); + row_buffer.push_string(formatted_write_time.c_str(), + formatted_write_time.length()); // WriteTimeSec + + double write_speed = _get_write_speed(_written_data_bytes->value(), _file_write_timer->value()); + std::string formatted_write_speed = fmt::format("{:.2f}", write_speed); + row_buffer.push_string(formatted_write_speed.c_str(), + formatted_write_speed.length()); // WriteSpeedKB std::unique_ptr<TFetchDataResult> result = std::make_unique<TFetchDataResult>(); result->result_batch.rows.resize(1); @@ -295,6 +312,8 @@ Status VFileResultWriter::_send_result() { std::make_pair("TotalRows", std::to_string(_written_rows_counter->value()))); attach_infos.insert(std::make_pair("FileSize", std::to_string(_written_data_bytes->value()))); attach_infos.insert(std::make_pair("URL", file_url)); + attach_infos.insert(std::make_pair("WriteTimeSec", formatted_write_time)); + attach_infos.insert(std::make_pair("WriteSpeedKB", formatted_write_speed)); result->result_batch.__set_attached_infos(attach_infos); RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result), @@ -309,20 +328,29 @@ Status VFileResultWriter::_fill_result_block() { _is_result_sent = true; #ifndef INSERT_TO_COLUMN -#define INSERT_TO_COLUMN \ - if (i == 0) { \ - column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0); \ - } else if (i == 1) { \ - int64_t written_rows = _written_rows_counter->value(); \ - column->insert_data(reinterpret_cast<const char*>(&written_rows), 0); \ - } else if (i == 2) { \ - int64_t written_data_bytes = _written_data_bytes->value(); \ - column->insert_data(reinterpret_cast<const char*>(&written_data_bytes), 0); \ - } else if (i == 3) { \ - std::string file_url; \ - static_cast<void>(_get_file_url(&file_url)); \ - column->insert_data(file_url.c_str(), file_url.size()); \ - } \ +#define INSERT_TO_COLUMN \ + if (i == 0) { \ + column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0); \ + } else if (i == 1) { \ + int64_t written_rows = _written_rows_counter->value(); \ + column->insert_data(reinterpret_cast<const char*>(&written_rows), 0); \ + } else if (i == 2) { \ + int64_t written_data_bytes = _written_data_bytes->value(); \ + column->insert_data(reinterpret_cast<const char*>(&written_data_bytes), 0); \ + } else if (i == 3) { \ + std::string file_url; \ + static_cast<void>(_get_file_url(&file_url)); \ + column->insert_data(file_url.c_str(), file_url.size()); \ + } else if (i == 4) { \ + double write_time = _file_write_timer->value() / nons_to_second; \ + std::string formatted_write_time = fmt::format("{:.3f}", write_time); \ + column->insert_data(formatted_write_time.c_str(), formatted_write_time.size()); \ + } else if (i == 5) { \ + double write_speed = \ + _get_write_speed(_written_data_bytes->value(), _file_write_timer->value()); \ + std::string formatted_write_speed = fmt::format("{:.2f}", write_speed); \ + column->insert_data(formatted_write_speed.c_str(), formatted_write_speed.size()); \ + } \ _output_block->replace_by_position(i, std::move(column)); #endif @@ -387,6 +415,14 @@ Status VFileResultWriter::_delete_dir() { } } +double VFileResultWriter::_get_write_speed(int64_t write_bytes, int64_t write_time) { + if (write_time <= 0) { + return 0; + } + // KB / s + return ((write_bytes * nons_to_second) / (write_time)) / 1024; +} + Status VFileResultWriter::close(Status exec_status) { Status st = exec_status; if (st.ok()) { diff --git a/be/src/vec/sink/writer/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h index bf0a5d3e9e2..8b611d7ceef 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -21,6 +21,7 @@ #include <stddef.h> #include <stdint.h> +#include <cstdint> #include <iosfwd> #include <memory> #include <string> @@ -99,6 +100,7 @@ private: Status _fill_result_block(); // delete the dir of file_path Status _delete_dir(); + double _get_write_speed(int64_t write_bytes, int64_t write_time); RuntimeState* _state; // not owned, set when init const pipeline::ResultFileOptions* _file_opts = nullptr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index d4a7b25ed5a..adf601da1ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -79,17 +79,23 @@ public class OutFileClause { public static final String TOTAL_ROWS = "TotalRows"; public static final String FILE_SIZE = "FileSize"; public static final String URL = "URL"; + public static final String WRITE_TIME_SEC = "WriteTimeSec"; + public static final String WRITE_SPEED_KB = "WriteSpeedKB"; static { RESULT_COL_NAMES.add(FILE_NUMBER); RESULT_COL_NAMES.add(TOTAL_ROWS); RESULT_COL_NAMES.add(FILE_SIZE); RESULT_COL_NAMES.add(URL); + RESULT_COL_NAMES.add(WRITE_TIME_SEC); + RESULT_COL_NAMES.add(WRITE_SPEED_KB); RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.INT)); RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT)); RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT)); RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR)); + RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR)); + RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR)); PARQUET_REPETITION_TYPE_MAP.put("required", TParquetRepetitionType.REQUIRED); PARQUET_REPETITION_TYPE_MAP.put("repeated", TParquetRepetitionType.REPEATED); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java index b81fb5404c1..3d321cfc9fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java @@ -215,6 +215,8 @@ public class ExportTaskExecutor implements TransientTaskExecutor { outfileInfoOneRow.setTotalRows(row.get(OutFileClause.TOTAL_ROWS)); outfileInfoOneRow.setFileSize(row.get(OutFileClause.FILE_SIZE)); outfileInfoOneRow.setUrl(row.get(OutFileClause.URL)); + outfileInfoOneRow.setWriteTime(row.get(OutFileClause.WRITE_TIME_SEC)); + outfileInfoOneRow.setWriteSpeed(row.get(OutFileClause.WRITE_SPEED_KB)); outfileInfo.add(outfileInfoOneRow); } return outfileInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java index b9befd9d326..262bf98adef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java @@ -34,4 +34,10 @@ public class OutfileInfo { @SerializedName("url") private String url; + + @SerializedName("writeTime") + private String writeTime; + + @SerializedName("writeSpeed") + private String writeSpeed; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java index 631339c3732..3dbd5bc2115 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java @@ -24,8 +24,6 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; @@ -145,39 +143,26 @@ public class ResultFileSink extends DataSink { /** * Construct a tuple for file status, the tuple schema as following: - * | FileNumber | Int | - * | TotalRows | Bigint | - * | FileSize | Bigint | - * | URL | Varchar | + * | FileNumber | Int | + * | TotalRows | Bigint | + * | FileSize | Bigint | + * | URL | Varchar | + * | WriteTimeSec | Varchar | + * | WriteSpeedKB | Varchar | */ public static TupleDescriptor constructFileStatusTupleDesc(DescriptorTable descriptorTable) { TupleDescriptor resultFileStatusTupleDesc = descriptorTable.createTupleDescriptor("result_file_status"); resultFileStatusTupleDesc.setIsMaterialized(true); - SlotDescriptor fileNumber = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc); - fileNumber.setLabel("FileNumber"); - fileNumber.setType(ScalarType.createType(PrimitiveType.INT)); - fileNumber.setColumn(new Column("FileNumber", ScalarType.createType(PrimitiveType.INT))); - fileNumber.setIsMaterialized(true); - fileNumber.setIsNullable(false); - SlotDescriptor totalRows = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc); - totalRows.setLabel("TotalRows"); - totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT)); - totalRows.setColumn(new Column("TotalRows", ScalarType.createType(PrimitiveType.BIGINT))); - totalRows.setIsMaterialized(true); - totalRows.setIsNullable(false); - SlotDescriptor fileSize = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc); - fileSize.setLabel("FileSize"); - fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT)); - fileSize.setColumn(new Column("FileSize", ScalarType.createType(PrimitiveType.BIGINT))); - fileSize.setIsMaterialized(true); - fileSize.setIsNullable(false); - SlotDescriptor url = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc); - url.setLabel("URL"); - url.setType(ScalarType.createType(PrimitiveType.VARCHAR)); - url.setColumn(new Column("URL", ScalarType.createType(PrimitiveType.VARCHAR))); - url.setIsMaterialized(true); - url.setIsNullable(false); + for (int i = 0; i < OutFileClause.RESULT_COL_NAMES.size(); ++i) { + SlotDescriptor slotDescriptor = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc); + slotDescriptor.setLabel(OutFileClause.RESULT_COL_NAMES.get(i)); + slotDescriptor.setType(OutFileClause.RESULT_COL_TYPES.get(i)); + slotDescriptor.setColumn(new Column(OutFileClause.RESULT_COL_NAMES.get(i), + OutFileClause.RESULT_COL_TYPES.get(i))); + slotDescriptor.setIsMaterialized(true); + slotDescriptor.setIsNullable(false); + } resultFileStatusTupleDesc.computeStatAndMemLayout(); return resultFileStatusTupleDesc; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org