This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dbfe8e4  [enhancement] Optimize load CSV file memory allocate (#6174)
dbfe8e4 is described below

commit dbfe8e4753475ba1bfd490dfdae7003c62e32a86
Author: Zhengguo Yang <yangz...@gmail.com>
AuthorDate: Mon Jul 12 09:58:45 2021 +0800

    [enhancement] Optimize load CSV file memory allocate (#6174)
    
    Optimize load CSV file memory allocate, avoid frequent allocation,
    may reduce the load time by 40%-50% when large column numbers
---
 be/src/exec/broker_scanner.cpp              | 25 ++++++++++---------
 be/src/exec/broker_scanner.h                |  6 ++---
 be/src/exec/tablet_info.h                   |  6 ++---
 be/test/exec/multi_bytes_separator_test.cpp | 38 +++++++++++++----------------
 4 files changed, 36 insertions(+), 39 deletions(-)

diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 6287705..b275647 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -40,7 +40,7 @@
 #include "util/utf8_check.h"
 
 #if defined(__x86_64__)
-    #include "exec/hdfs_file_reader.h"
+#include "exec/hdfs_file_reader.h"
 #endif
 
 namespace doris {
@@ -75,6 +75,7 @@ BrokerScanner::BrokerScanner(RuntimeState* state, 
RuntimeProfile* profile,
         _line_delimiter.push_back(static_cast<char>(params.line_delimiter));
         _line_delimiter_length = 1;
     }
+    _split_values.reserve(sizeof(Slice) * params.src_slot_ids.size());
 }
 
 BrokerScanner::~BrokerScanner() {
@@ -323,7 +324,8 @@ void BrokerScanner::close() {
     }
 }
 
-void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) {
+void BrokerScanner::split_line(const Slice& line) {
+    _split_values.clear();
     const char* value = line.data;
     size_t start = 0;  // point to the start pos of next col value.
     size_t curpos = 0; // point to the start pos of separator matching 
sequence.
@@ -348,7 +350,7 @@ void BrokerScanner::split_line(const Slice& line, 
std::vector<Slice>* values) {
             p1++;
             if (p1 == _value_separator_length) {
                 // Match a separator
-                values->emplace_back(value + start, curpos - start);
+                _split_values.emplace_back(value + start, curpos - start);
                 start = curpos + _value_separator_length;
                 curpos = start;
                 p1 = 0;
@@ -357,7 +359,7 @@ void BrokerScanner::split_line(const Slice& line, 
std::vector<Slice>* values) {
     }
 
     CHECK(curpos == line.size) << curpos << " vs " << line.size;
-    values->emplace_back(value + start, curpos - start);
+    _split_values.emplace_back(value + start, curpos - start);
 }
 
 void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, 
char** new_value_p,
@@ -460,26 +462,25 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
         return false;
     }
 
-    std::vector<Slice> values;
-    { split_line(line, &values); }
+    split_line(line);
 
     // range of current file
     const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
     const std::vector<std::string>& columns_from_path = 
range.columns_from_path;
-    if (values.size() + columns_from_path.size() < _src_slot_descs.size()) {
+    if (_split_values.size() + columns_from_path.size() < 
_src_slot_descs.size()) {
         std::stringstream error_msg;
         error_msg << "actual column number is less than schema column number. "
-                  << "actual number: " << values.size() << " column separator: 
["
+                  << "actual number: " << _split_values.size() << " column 
separator: ["
                   << _value_separator << "], "
                   << "line delimiter: [" << _line_delimiter << "], "
                   << "schema number: " << _src_slot_descs.size() << "; ";
         _state->append_error_msg_to_file(std::string(line.data, line.size), 
error_msg.str());
         _counter->num_rows_filtered++;
         return false;
-    } else if (values.size() + columns_from_path.size() > 
_src_slot_descs.size()) {
+    } else if (_split_values.size() + columns_from_path.size() > 
_src_slot_descs.size()) {
         std::stringstream error_msg;
         error_msg << "actual column number is more than schema column number. "
-                  << "actual number: " << values.size() << " column separator: 
["
+                  << "actual number: " << _split_values.size() << " column 
separator: ["
                   << _value_separator << "], "
                   << "line delimiter: [" << _line_delimiter << "], "
                   << "schema number: " << _src_slot_descs.size() << "; ";
@@ -488,9 +489,9 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
         return false;
     }
 
-    for (int i = 0; i < values.size(); ++i) {
+    for (int i = 0; i < _split_values.size(); ++i) {
         auto slot_desc = _src_slot_descs[i];
-        const Slice& value = values[i];
+        const Slice& value = _split_values[i];
         if (slot_desc->is_nullable() && is_null(value)) {
             _src_tuple->set_null(slot_desc->null_indicator_offset());
             continue;
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index da41f47..4578a77 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -55,8 +55,7 @@ public:
     BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
                   const TBrokerScanRangeParams& params, const 
std::vector<TBrokerRangeDesc>& ranges,
                   const std::vector<TNetworkAddress>& broker_addresses,
-                                 const std::vector<ExprContext*>& 
pre_filter_ctxs,
-                  ScannerCounter* counter);
+                  const std::vector<ExprContext*>& pre_filter_ctxs, 
ScannerCounter* counter);
     ~BrokerScanner();
 
     // Open this scanner, will initialize information need to
@@ -76,7 +75,7 @@ private:
     Status open_next_reader();
 
     // Split one text line to values
-    void split_line(const Slice& line, std::vector<Slice>* values);
+    void split_line(const Slice& line);
 
     void fill_fix_length_string(const Slice& value, MemPool* pool, char** 
new_value_p,
                                 int new_value_length);
@@ -118,6 +117,7 @@ private:
 
     // used to hold current StreamLoadPipe
     std::shared_ptr<StreamLoadPipe> _stream_load_pipe;
+    std::vector<Slice> _split_values;
 };
 
 } // namespace doris
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index 59c2962..f88caec 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -137,7 +137,7 @@ public:
     }
 
 private:
-    std::vector<SlotDescriptor*> _slot_descs;
+    const std::vector<SlotDescriptor*>& _slot_descs;
 };
 
 // store an olap table's tablet information
@@ -175,7 +175,7 @@ private:
         }
         OlapTablePartKeyComparator comparator(_partition_slot_descs);
         const TOlapTablePartition& t_part = _t_param.partitions[0];
-        // when list partition, return true if equals. 
+        // when list partition, return true if equals.
         if (t_part.__isset.in_keys) {
             bool ret = false;
             for (auto in_key : part->in_keys) {
@@ -185,7 +185,7 @@ private:
                 }
             }
             return ret;
-        } 
+        }
         return !comparator(key, part->start_key);
     }
 
diff --git a/be/test/exec/multi_bytes_separator_test.cpp 
b/be/test/exec/multi_bytes_separator_test.cpp
index c3fea0e..082e58f 100644
--- a/be/test/exec/multi_bytes_separator_test.cpp
+++ b/be/test/exec/multi_bytes_separator_test.cpp
@@ -54,7 +54,7 @@ TEST_F(MultiBytesSeparatorTest, normal) {
     params.line_delimiter_str = "BBB";
     params.column_separator_length = 4;
     params.line_delimiter_length = 3;
-    
+
     const std::vector<TBrokerRangeDesc> ranges;
     const std::vector<TNetworkAddress> broker_addresses;
     const std::vector<ExprContext*> pre_filter_ctxs;
@@ -66,31 +66,28 @@ TEST_F(MultiBytesSeparatorTest, normal) {
     {
         std::string line = "AAAA";
         Slice s(line);
-        std::vector<Slice> values;
-        scanner.split_line(s, &values);
-        ASSERT_EQ(2, values.size());
-        ASSERT_EQ(0, values[0].size);
-        ASSERT_EQ(0, values[1].size);
+        scanner.split_line(s);
+        ASSERT_EQ(2, scanner._split_values.size());
+        ASSERT_EQ(0, scanner._split_values[0].size);
+        ASSERT_EQ(0, scanner._split_values[1].size);
     }
 
     // 2.
     {
         std::string line = "ABAA";
         Slice s(line);
-        std::vector<Slice> values;
-        scanner.split_line(s, &values);
-        ASSERT_EQ(1, values.size());
-        ASSERT_EQ(4, values[0].size);
+        scanner.split_line(s);
+        ASSERT_EQ(1, scanner._split_values.size());
+        ASSERT_EQ(4, scanner._split_values[0].size);
     }
 
     // 3.
     {
         std::string line = "";
         Slice s(line);
-        std::vector<Slice> values;
-        scanner.split_line(s, &values);
-        ASSERT_EQ(1, values.size());
-        ASSERT_EQ(0, values[0].size);
+        scanner.split_line(s);
+        ASSERT_EQ(1, scanner._split_values.size());
+        ASSERT_EQ(0, scanner._split_values[0].size);
     }
 
     // 4.
@@ -98,13 +95,12 @@ TEST_F(MultiBytesSeparatorTest, normal) {
         // 1234, AAAB, , AA
         std::string line = "1234AAAAAAABAAAAAAAAAA";
         Slice s(line);
-        std::vector<Slice> values;
-        scanner.split_line(s, &values);
-        ASSERT_EQ(4, values.size());
-        ASSERT_EQ(4, values[0].size);
-        ASSERT_EQ(4, values[1].size);
-        ASSERT_EQ(0, values[2].size);
-        ASSERT_EQ(2, values[3].size);
+        scanner.split_line(s);
+        ASSERT_EQ(4, scanner._split_values.size());
+        ASSERT_EQ(4, scanner._split_values[0].size);
+        ASSERT_EQ(4, scanner._split_values[1].size);
+        ASSERT_EQ(0, scanner._split_values[2].size);
+        ASSERT_EQ(2, scanner._split_values[3].size);
     }
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to