This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new dbfe8e4 [enhancement] Optimize load CSV file memory allocate (#6174) dbfe8e4 is described below commit dbfe8e4753475ba1bfd490dfdae7003c62e32a86 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Mon Jul 12 09:58:45 2021 +0800 [enhancement] Optimize load CSV file memory allocate (#6174) Optimize load CSV file memory allocate, avoid frequent allocation, may reduce the load time by 40%-50% when large column numbers --- be/src/exec/broker_scanner.cpp | 25 ++++++++++--------- be/src/exec/broker_scanner.h | 6 ++--- be/src/exec/tablet_info.h | 6 ++--- be/test/exec/multi_bytes_separator_test.cpp | 38 +++++++++++++---------------- 4 files changed, 36 insertions(+), 39 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 6287705..b275647 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -40,7 +40,7 @@ #include "util/utf8_check.h" #if defined(__x86_64__) - #include "exec/hdfs_file_reader.h" +#include "exec/hdfs_file_reader.h" #endif namespace doris { @@ -75,6 +75,7 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, _line_delimiter.push_back(static_cast<char>(params.line_delimiter)); _line_delimiter_length = 1; } + _split_values.reserve(sizeof(Slice) * params.src_slot_ids.size()); } BrokerScanner::~BrokerScanner() { @@ -323,7 +324,8 @@ void BrokerScanner::close() { } } -void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) { +void BrokerScanner::split_line(const Slice& line) { + _split_values.clear(); const char* value = line.data; size_t start = 0; // point to the start pos of next col value. size_t curpos = 0; // point to the start pos of separator matching sequence. @@ -348,7 +350,7 @@ void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) { p1++; if (p1 == _value_separator_length) { // Match a separator - values->emplace_back(value + start, curpos - start); + _split_values.emplace_back(value + start, curpos - start); start = curpos + _value_separator_length; curpos = start; p1 = 0; @@ -357,7 +359,7 @@ void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) { } CHECK(curpos == line.size) << curpos << " vs " << line.size; - values->emplace_back(value + start, curpos - start); + _split_values.emplace_back(value + start, curpos - start); } void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p, @@ -460,26 +462,25 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { return false; } - std::vector<Slice> values; - { split_line(line, &values); } + split_line(line); // range of current file const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); const std::vector<std::string>& columns_from_path = range.columns_from_path; - if (values.size() + columns_from_path.size() < _src_slot_descs.size()) { + if (_split_values.size() + columns_from_path.size() < _src_slot_descs.size()) { std::stringstream error_msg; error_msg << "actual column number is less than schema column number. " - << "actual number: " << values.size() << " column separator: [" + << "actual number: " << _split_values.size() << " column separator: [" << _value_separator << "], " << "line delimiter: [" << _line_delimiter << "], " << "schema number: " << _src_slot_descs.size() << "; "; _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str()); _counter->num_rows_filtered++; return false; - } else if (values.size() + columns_from_path.size() > _src_slot_descs.size()) { + } else if (_split_values.size() + columns_from_path.size() > _src_slot_descs.size()) { std::stringstream error_msg; error_msg << "actual column number is more than schema column number. " - << "actual number: " << values.size() << " column separator: [" + << "actual number: " << _split_values.size() << " column separator: [" << _value_separator << "], " << "line delimiter: [" << _line_delimiter << "], " << "schema number: " << _src_slot_descs.size() << "; "; @@ -488,9 +489,9 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { return false; } - for (int i = 0; i < values.size(); ++i) { + for (int i = 0; i < _split_values.size(); ++i) { auto slot_desc = _src_slot_descs[i]; - const Slice& value = values[i]; + const Slice& value = _split_values[i]; if (slot_desc->is_nullable() && is_null(value)) { _src_tuple->set_null(slot_desc->null_indicator_offset()); continue; diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index da41f47..4578a77 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -55,8 +55,7 @@ public: BrokerScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector<TBrokerRangeDesc>& ranges, const std::vector<TNetworkAddress>& broker_addresses, - const std::vector<ExprContext*>& pre_filter_ctxs, - ScannerCounter* counter); + const std::vector<ExprContext*>& pre_filter_ctxs, ScannerCounter* counter); ~BrokerScanner(); // Open this scanner, will initialize information need to @@ -76,7 +75,7 @@ private: Status open_next_reader(); // Split one text line to values - void split_line(const Slice& line, std::vector<Slice>* values); + void split_line(const Slice& line); void fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p, int new_value_length); @@ -118,6 +117,7 @@ private: // used to hold current StreamLoadPipe std::shared_ptr<StreamLoadPipe> _stream_load_pipe; + std::vector<Slice> _split_values; }; } // namespace doris diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 59c2962..f88caec 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -137,7 +137,7 @@ public: } private: - std::vector<SlotDescriptor*> _slot_descs; + const std::vector<SlotDescriptor*>& _slot_descs; }; // store an olap table's tablet information @@ -175,7 +175,7 @@ private: } OlapTablePartKeyComparator comparator(_partition_slot_descs); const TOlapTablePartition& t_part = _t_param.partitions[0]; - // when list partition, return true if equals. + // when list partition, return true if equals. if (t_part.__isset.in_keys) { bool ret = false; for (auto in_key : part->in_keys) { @@ -185,7 +185,7 @@ private: } } return ret; - } + } return !comparator(key, part->start_key); } diff --git a/be/test/exec/multi_bytes_separator_test.cpp b/be/test/exec/multi_bytes_separator_test.cpp index c3fea0e..082e58f 100644 --- a/be/test/exec/multi_bytes_separator_test.cpp +++ b/be/test/exec/multi_bytes_separator_test.cpp @@ -54,7 +54,7 @@ TEST_F(MultiBytesSeparatorTest, normal) { params.line_delimiter_str = "BBB"; params.column_separator_length = 4; params.line_delimiter_length = 3; - + const std::vector<TBrokerRangeDesc> ranges; const std::vector<TNetworkAddress> broker_addresses; const std::vector<ExprContext*> pre_filter_ctxs; @@ -66,31 +66,28 @@ TEST_F(MultiBytesSeparatorTest, normal) { { std::string line = "AAAA"; Slice s(line); - std::vector<Slice> values; - scanner.split_line(s, &values); - ASSERT_EQ(2, values.size()); - ASSERT_EQ(0, values[0].size); - ASSERT_EQ(0, values[1].size); + scanner.split_line(s); + ASSERT_EQ(2, scanner._split_values.size()); + ASSERT_EQ(0, scanner._split_values[0].size); + ASSERT_EQ(0, scanner._split_values[1].size); } // 2. { std::string line = "ABAA"; Slice s(line); - std::vector<Slice> values; - scanner.split_line(s, &values); - ASSERT_EQ(1, values.size()); - ASSERT_EQ(4, values[0].size); + scanner.split_line(s); + ASSERT_EQ(1, scanner._split_values.size()); + ASSERT_EQ(4, scanner._split_values[0].size); } // 3. { std::string line = ""; Slice s(line); - std::vector<Slice> values; - scanner.split_line(s, &values); - ASSERT_EQ(1, values.size()); - ASSERT_EQ(0, values[0].size); + scanner.split_line(s); + ASSERT_EQ(1, scanner._split_values.size()); + ASSERT_EQ(0, scanner._split_values[0].size); } // 4. @@ -98,13 +95,12 @@ TEST_F(MultiBytesSeparatorTest, normal) { // 1234, AAAB, , AA std::string line = "1234AAAAAAABAAAAAAAAAA"; Slice s(line); - std::vector<Slice> values; - scanner.split_line(s, &values); - ASSERT_EQ(4, values.size()); - ASSERT_EQ(4, values[0].size); - ASSERT_EQ(4, values[1].size); - ASSERT_EQ(0, values[2].size); - ASSERT_EQ(2, values[3].size); + scanner.split_line(s); + ASSERT_EQ(4, scanner._split_values.size()); + ASSERT_EQ(4, scanner._split_values[0].size); + ASSERT_EQ(4, scanner._split_values[1].size); + ASSERT_EQ(0, scanner._split_values[2].size); + ASSERT_EQ(2, scanner._split_values[3].size); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org