This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new e023ef5 [Load] Support multi bytes LineDelimiter and ColumnSeparator (#5462) e023ef5 is described below commit e023ef5404493fa0b1aece6b72d1c2202f190dc2 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Tue Mar 9 09:35:39 2021 +0800 [Load] Support multi bytes LineDelimiter and ColumnSeparator (#5462) * [Internal][Support Multibytes Separator] doris-1079 support multi bytes LineDelimiter and ColumnSeparator --- be/CMakeLists.txt | 2 +- be/src/exec/broker_scanner.cpp | 52 ++++++++++++++++------ be/src/exec/broker_scanner.h | 6 ++- be/src/exec/plain_text_line_reader.cpp | 19 ++++---- be/src/exec/plain_text_line_reader.h | 6 ++- be/src/http/action/stream_load.cpp | 3 ++ be/src/http/http_common.h | 1 + be/test/exec/plain_text_line_reader_bzip_test.cpp | 12 ++--- be/test/exec/plain_text_line_reader_gzip_test.cpp | 14 +++--- .../exec/plain_text_line_reader_lz4frame_test.cpp | 12 ++--- be/test/exec/plain_text_line_reader_lzop_test.cpp | 14 +++--- .../plain_text_line_reader_uncompressed_test.cpp | 16 +++---- fe/fe-core/src/main/cup/sql_parser.cup | 10 ++--- .../doris/analysis/CreateRoutineLoadStmt.java | 11 +++-- .../org/apache/doris/analysis/DataDescription.java | 17 ++++--- .../{ColumnSeparator.java => Separator.java} | 9 ++-- .../org/apache/doris/load/BrokerFileGroup.java | 6 +-- .../src/main/java/org/apache/doris/load/Load.java | 21 ++++++--- .../org/apache/doris/load/RoutineLoadDesc.java | 14 ++++-- .../doris/load/routineload/RoutineLoadJob.java | 15 +++++-- .../org/apache/doris/planner/BrokerScanNode.java | 4 ++ .../apache/doris/planner/StreamLoadScanNode.java | 16 ++++++- .../java/org/apache/doris/qe/MultiLoadMgr.java | 6 +-- .../java/org/apache/doris/task/LoadTaskInfo.java | 5 ++- .../java/org/apache/doris/task/StreamLoadTask.java | 21 +++++++-- .../doris/analysis/CreateRoutineLoadStmtTest.java | 4 +- .../apache/doris/analysis/DataDescriptionTest.java | 12 ++--- ...ColumnSeparatorTest.java => SeparatorTest.java} | 26 +++++------ .../load/routineload/KafkaRoutineLoadJobTest.java | 8 ++-- .../load/routineload/RoutineLoadManagerTest.java | 6 +-- gensrc/thrift/FrontendService.thrift | 1 + gensrc/thrift/PlanNodes.thrift | 6 +++ 32 files changed, 240 insertions(+), 135 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 5d624ac..8be4883 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -584,7 +584,7 @@ FUNCTION(ADD_BE_TEST TEST_NAME) ADD_EXECUTABLE(${TEST_FILE_NAME} ${TEST_NAME}.cpp ${ADDITIONAL_FILES}) TARGET_LINK_LIBRARIES(${TEST_FILE_NAME} ${TEST_LINK_LIBS}) - SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control") + SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control" ENABLE_EXPORTS 1) if (NOT "${TEST_DIR_NAME}" STREQUAL "") SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}/${TEST_DIR_NAME}") endif() diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 1844ba8..f732244 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -49,16 +49,28 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, : BaseScanner(state, profile, params, pre_filter_ctxs, counter), _ranges(ranges), _broker_addresses(broker_addresses), - // _splittable(params.splittable), - _value_separator(static_cast<char>(params.column_separator)), - _line_delimiter(static_cast<char>(params.line_delimiter)), _cur_file_reader(nullptr), _cur_line_reader(nullptr), _cur_decompressor(nullptr), _next_range(0), _cur_line_reader_eof(false), _scanner_eof(false), - _skip_next_line(false) {} + _skip_next_line(false) { + if (params.__isset.column_separator_length && params.column_separator_length > 1) { + _value_separator = params.column_separator_str; + _value_separator_length = params.column_separator_length; + } else { + _value_separator.push_back(static_cast<char>(params.column_separator)); + _value_separator_length = 1; + } + if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) { + _line_delimiter = params.line_delimiter_str; + _line_delimiter_length = params.line_delimiter_length; + } else { + _line_delimiter.push_back(static_cast<char>(params.line_delimiter)); + _line_delimiter_length = 1; + } +} BrokerScanner::~BrokerScanner() { close(); @@ -255,7 +267,7 @@ Status BrokerScanner::open_line_reader() { case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_CSV_DEFLATE: _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor, - size, _line_delimiter); + size, _line_delimiter, _line_delimiter_length); break; default: { std::stringstream ss; @@ -292,16 +304,24 @@ void BrokerScanner::close() { } void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) { - // line-begin char and line-end char are considered to be 'delimiter' const char* value = line.data; - const char* ptr = line.data; - for (size_t i = 0; i < line.size; ++i, ++ptr) { - if (*ptr == _value_separator) { - values->emplace_back(value, ptr - value); - value = ptr + 1; + size_t i = 0; + // TODO improve the performance + while (i < line.size) { + if (i + _value_separator_length <= line.size) { + if (_value_separator.compare(0, _value_separator_length, line.data + i, + _value_separator_length) == 0) { + values->emplace_back(value, line.data + i - value); + value = line.data + i + _value_separator_length; + i += _value_separator_length; + } else { + ++i; + } + } else { + break; } } - values->emplace_back(value, ptr - value); + values->emplace_back(value, line.data + i - value); } void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p, @@ -413,7 +433,9 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { if (values.size() + columns_from_path.size() < _src_slot_descs.size()) { std::stringstream error_msg; error_msg << "actual column number is less than schema column number. " - << "actual number: " << values.size() << " sep: " << _value_separator << ", " + << "actual number: " << values.size() << " column separator: [" + << _value_separator << "], " + << "line delimiter: [" << _line_delimiter << "], " << "schema number: " << _src_slot_descs.size() << "; "; _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str()); _counter->num_rows_filtered++; @@ -421,7 +443,9 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { } else if (values.size() + columns_from_path.size() > _src_slot_descs.size()) { std::stringstream error_msg; error_msg << "actual column number is more than schema column number. " - << "actual number: " << values.size() << " sep: " << _value_separator << ", " + << "actual number: " << values.size() << " column separator: [" + << _value_separator << "], " + << "line delimiter: [" << _line_delimiter << "], " << "schema number: " << _src_slot_descs.size() << "; "; _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str()); _counter->num_rows_filtered++; diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 2a2563e..da41f47 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -98,8 +98,10 @@ private: std::unique_ptr<TextConverter> _text_converter; - char _value_separator; - char _line_delimiter; + std::string _value_separator; + std::string _line_delimiter; + int _value_separator_length; + int _line_delimiter_length; // Reader FileReader* _cur_file_reader; diff --git a/be/src/exec/plain_text_line_reader.cpp b/be/src/exec/plain_text_line_reader.cpp index 2f0dcc3..d51fa33 100644 --- a/be/src/exec/plain_text_line_reader.cpp +++ b/be/src/exec/plain_text_line_reader.cpp @@ -34,13 +34,14 @@ namespace doris { PlainTextLineReader::PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader, Decompressor* decompressor, size_t length, - uint8_t line_delimiter) + const std::string& line_delimiter, size_t line_delimiter_length) : _profile(profile), _file_reader(file_reader), _decompressor(decompressor), _min_length(length), _total_read_bytes(0), _line_delimiter(line_delimiter), + _line_delimiter_length(line_delimiter_length), _input_buf(new uint8_t[INPUT_CHUNK]), _input_buf_size(INPUT_CHUNK), _input_buf_pos(0), @@ -92,7 +93,7 @@ inline bool PlainTextLineReader::update_eof() { uint8_t* PlainTextLineReader::update_field_pos_and_find_line_delimiter(const uint8_t* start, size_t len) { // TODO: meanwhile find and save field pos - return (uint8_t*)memmem(start, len, &_line_delimiter, 1); + return (uint8_t*)memmem(start, len, _line_delimiter.c_str(), _line_delimiter_length); } // extend input buf if necessary only when _more_input_bytes > 0 @@ -195,10 +196,12 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e if (pos == nullptr) { // didn't find line delimiter, read more data from decompressor - // 1. point 'offset' to _output_buf_limit - offset = output_buf_read_remaining(); - - // 2. read from file reader + // for multi bytes delimiter we cannot set offset to avoid incomplete + // delimiter + // read from file reader + if (_line_delimiter_length == 1) { + offset = output_buf_read_remaining(); + } extend_output_buf(); if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0) { // we still have data in input which is not decompressed. @@ -266,7 +269,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e if (_decompressor != nullptr) { SCOPED_TIMER(_decompress_timer); - // 2. decompress + // decompress size_t input_read_bytes = 0; size_t decompressed_len = 0; _more_input_bytes = 0; @@ -316,7 +319,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e // we found a complete line // ready to return offset = pos - cur_ptr; - found_line_delimiter = 1; + found_line_delimiter = _line_delimiter_length; break; } } // while (!done()) diff --git a/be/src/exec/plain_text_line_reader.h b/be/src/exec/plain_text_line_reader.h index 01ed692..2b8aad6 100644 --- a/be/src/exec/plain_text_line_reader.h +++ b/be/src/exec/plain_text_line_reader.h @@ -29,7 +29,8 @@ class Status; class PlainTextLineReader : public LineReader { public: PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader, - Decompressor* decompressor, size_t length, uint8_t line_delimiter); + Decompressor* decompressor, size_t length, + const std::string& line_delimiter, size_t line_delimiter_length); virtual ~PlainTextLineReader(); @@ -61,7 +62,8 @@ private: Decompressor* _decompressor; size_t _min_length; size_t _total_read_bytes; - uint8_t _line_delimiter; + std::string _line_delimiter; + size_t _line_delimiter_length; // save the data read from file reader uint8_t* _input_buf; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 47bfdb3..fbcbba5 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -348,6 +348,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) { request.__set_columnSeparator(http_req->header(HTTP_COLUMN_SEPARATOR)); } + if (!http_req->header(HTTP_LINE_DELIMITER).empty()) { + request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER)); + } if (!http_req->header(HTTP_PARTITIONS).empty()) { request.__set_partitions(http_req->header(HTTP_PARTITIONS)); request.__set_isTempPartition(false); diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 5681db6..d9bf046 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -28,6 +28,7 @@ static const std::string HTTP_FORMAT_KEY = "format"; static const std::string HTTP_COLUMNS = "columns"; static const std::string HTTP_WHERE = "where"; static const std::string HTTP_COLUMN_SEPARATOR = "column_separator"; +static const std::string HTTP_LINE_DELIMITER = "line_delimiter"; static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio"; static const std::string HTTP_TIMEOUT = "timeout"; static const std::string HTTP_PARTITIONS = "partitions"; diff --git a/be/test/exec/plain_text_line_reader_bzip_test.cpp b/be/test/exec/plain_text_line_reader_bzip_test.cpp index 648e3f8..8a2868a 100644 --- a/be/test/exec/plain_text_line_reader_bzip_test.cpp +++ b/be/test/exec/plain_text_line_reader_bzip_test.cpp @@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_normal_use) { st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -97,7 +97,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit) { st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -134,7 +134,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit2) { st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -158,7 +158,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit3) { st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -188,7 +188,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit4) { st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -218,7 +218,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit5) { st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1); const uint8_t* ptr; size_t size; bool eof; diff --git a/be/test/exec/plain_text_line_reader_gzip_test.cpp b/be/test/exec/plain_text_line_reader_gzip_test.cpp index 8f75ef8..2f6d190 100644 --- a/be/test/exec/plain_text_line_reader_gzip_test.cpp +++ b/be/test/exec/plain_text_line_reader_gzip_test.cpp @@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, gzip_normal_use) { st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -98,7 +98,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_no_newline) { st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -133,7 +133,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit) { st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -169,7 +169,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit2) { st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -194,7 +194,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit3) { st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -224,7 +224,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit4) { st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -254,7 +254,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit5) { st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1); const uint8_t* ptr; size_t size; bool eof; diff --git a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp index 157fe12..f78d736 100644 --- a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp +++ b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp @@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, lz4_normal_use) { st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -97,7 +97,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit) { st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -134,7 +134,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit2) { st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -158,7 +158,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit3) { st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -188,7 +188,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit4) { st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -218,7 +218,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit5) { st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1); const uint8_t* ptr; size_t size; bool eof; diff --git a/be/test/exec/plain_text_line_reader_lzop_test.cpp b/be/test/exec/plain_text_line_reader_lzop_test.cpp index e06432c..5f83afc 100644 --- a/be/test/exec/plain_text_line_reader_lzop_test.cpp +++ b/be/test/exec/plain_text_line_reader_lzop_test.cpp @@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, lzop_normal_use) { st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -96,7 +96,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit) { st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -132,7 +132,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit2) { st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -155,7 +155,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit3) { st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -184,7 +184,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit4) { st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -213,7 +213,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit5) { st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -233,7 +233,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_larger) { st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor); ASSERT_TRUE(st.ok()); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1); const uint8_t* ptr; size_t size; bool eof; diff --git a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp index 807049d..6568b4f 100644 --- a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp +++ b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp @@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_normal_use) { ASSERT_TRUE(st.ok()); ASSERT_TRUE(decompressor == nullptr); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -98,7 +98,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_no_newline) { ASSERT_TRUE(st.ok()); ASSERT_TRUE(decompressor == nullptr); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -133,7 +133,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit) { ASSERT_TRUE(st.ok()); ASSERT_TRUE(decompressor == nullptr); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -170,7 +170,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit2) { ASSERT_TRUE(st.ok()); ASSERT_TRUE(decompressor == nullptr); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -196,7 +196,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit3) { ASSERT_TRUE(st.ok()); ASSERT_TRUE(decompressor == nullptr); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -227,7 +227,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit4) { ASSERT_TRUE(st.ok()); ASSERT_TRUE(decompressor == nullptr); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -258,7 +258,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit5) { ASSERT_TRUE(st.ok()); ASSERT_TRUE(decompressor == nullptr); - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1); const uint8_t* ptr; size_t size; bool eof; @@ -280,7 +280,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_empty) { ASSERT_TRUE(decompressor == nullptr); // set min length larger than 0 to test - PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 10, '\n'); + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 10, "\n", 1); const uint8_t* ptr; size_t size; bool eof; diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 71bd5bc..c611932 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -439,7 +439,7 @@ nonterminal List<String> opt_col_list, opt_dup_keys, opt_columns_from_path; nonterminal List<ColWithComment> opt_col_with_comment_list, col_with_comment_list; nonterminal ColWithComment col_with_comment; nonterminal List<Expr> opt_col_mapping_list; -nonterminal ColumnSeparator opt_field_term, column_separator; +nonterminal Separator opt_field_term, separator; nonterminal String opt_user_role; nonterminal TablePattern tbl_pattern; nonterminal ResourcePattern resource_pattern; @@ -1444,14 +1444,14 @@ opt_field_term ::= :} | KW_COLUMNS KW_TERMINATED KW_BY STRING_LITERAL:sep {: - RESULT = new ColumnSeparator(sep); + RESULT = new Separator(sep); :} ; -column_separator ::= +separator ::= KW_COLUMNS KW_TERMINATED KW_BY STRING_LITERAL:sep {: - RESULT = new ColumnSeparator(sep); + RESULT = new Separator(sep); :} ; @@ -1597,7 +1597,7 @@ opt_load_property_list ::= ; load_property ::= - column_separator:colSep + separator:colSep {: RESULT = colSep; :} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 1960c28..568e938 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -345,7 +345,9 @@ public class CreateRoutineLoadStmt extends DdlStmt { } public void checkLoadProperties() throws UserException { - ColumnSeparator columnSeparator = null; + Separator columnSeparator = null; + // TODO(yangzhengguo01): add line delimiter to properties + Separator lineDelimiter = null; ImportColumnsStmt importColumnsStmt = null; ImportWhereStmt precedingImportWhereStmt = null; ImportWhereStmt importWhereStmt = null; @@ -354,12 +356,12 @@ public class CreateRoutineLoadStmt extends DdlStmt { ImportDeleteOnStmt importDeleteOnStmt = null; if (loadPropertyList != null) { for (ParseNode parseNode : loadPropertyList) { - if (parseNode instanceof ColumnSeparator) { + if (parseNode instanceof Separator) { // check column separator if (columnSeparator != null) { throw new AnalysisException("repeat setting of column separator"); } - columnSeparator = (ColumnSeparator) parseNode; + columnSeparator = (Separator) parseNode; columnSeparator.analyze(null); } else if (parseNode instanceof ImportColumnsStmt) { // check columns info @@ -403,7 +405,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { } } } - routineLoadDesc = new RoutineLoadDesc(columnSeparator, importColumnsStmt, precedingImportWhereStmt, importWhereStmt, + routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter, importColumnsStmt, + precedingImportWhereStmt, importWhereStmt, partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType, importSequenceStmt == null ? null : importSequenceStmt.getSequenceColName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 7e31c2c..31046a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -96,7 +96,7 @@ public class DataDescription { private final String tableName; private final PartitionNames partitionNames; private final List<String> filePaths; - private final ColumnSeparator columnSeparator; + private final Separator columnSeparator; private final String fileFormat; private final boolean isNegative; // column names in the path @@ -112,7 +112,7 @@ public class DataDescription { private List<String> fileFieldNames; // Used for mini load private TNetworkAddress beAddr; - private String lineDelimiter; + private Separator lineDelimiter; private String columnDef; private long backendId; private boolean stripOuterArray = false; @@ -141,7 +141,7 @@ public class DataDescription { PartitionNames partitionNames, List<String> filePaths, List<String> columns, - ColumnSeparator columnSeparator, + Separator columnSeparator, String fileFormat, boolean isNegative, List<Expr> columnMappingList) { @@ -153,7 +153,7 @@ public class DataDescription { PartitionNames partitionNames, List<String> filePaths, List<String> columns, - ColumnSeparator columnSeparator, + Separator columnSeparator, String fileFormat, List<String> columnsFromPath, boolean isNegative, @@ -428,7 +428,7 @@ public class DataDescription { if (columnSeparator == null) { return null; } - return columnSeparator.getColumnSeparator(); + return columnSeparator.getSeparator(); } public boolean isNegative() { @@ -444,10 +444,13 @@ public class DataDescription { } public String getLineDelimiter() { - return lineDelimiter; + if (lineDelimiter == null) { + return null; + } + return lineDelimiter.getSeparator(); } - public void setLineDelimiter(String lineDelimiter) { + public void setLineDelimiter(Separator lineDelimiter) { this.lineDelimiter = lineDelimiter; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnSeparator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java similarity index 93% rename from fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnSeparator.java rename to fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java index 3d2b271..efc61d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnSeparator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java @@ -23,18 +23,18 @@ import com.google.common.base.Strings; import java.io.StringWriter; -public class ColumnSeparator implements ParseNode { +public class Separator implements ParseNode { private static final String HEX_STRING = "0123456789ABCDEF"; private final String oriSeparator; private String separator; - public ColumnSeparator(String separator) { + public Separator(String separator) { this.oriSeparator = separator; this.separator = null; } - public String getColumnSeparator() { + public String getSeparator() { return separator; } @@ -69,7 +69,8 @@ public class ColumnSeparator implements ParseNode { } if (originStr.toUpperCase().startsWith("\\X")) { - String hexStr = originStr.substring(2); + // convert \x01\x02\x0a to 01020a + String hexStr = originStr.replaceAll("(?i)\\\\X", ""); // check hex str if (hexStr.isEmpty()) { throw new AnalysisException("Hex str is empty"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index e608887..272a226 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -17,7 +17,7 @@ package org.apache.doris.load; -import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; @@ -110,8 +110,8 @@ public class BrokerFileGroup implements Writable { // Used for broker table, no need to parse public BrokerFileGroup(BrokerTable table) throws AnalysisException { this.tableId = table.getId(); - this.valueSeparator = ColumnSeparator.convertSeparator(table.getColumnSeparator()); - this.lineDelimiter = table.getLineDelimiter(); + this.valueSeparator = Separator.convertSeparator(table.getColumnSeparator()); + this.lineDelimiter = Separator.convertSeparator(table.getLineDelimiter()); this.isNegative = false; this.filePaths = table.getPaths(); this.fileFormat = table.getFileFormat(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 2cb3273..db7b958 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -22,7 +22,7 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.CastExpr; -import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprSubstitutionMap; @@ -258,9 +258,9 @@ public class Load { // partitions | column names | separator | line delimiter List<String> partitionNames = null; List<String> columnNames = null; - ColumnSeparator columnSeparator = null; + Separator columnSeparator = null; List<String> hllColumnPairList = null; - String lineDelimiter = null; + Separator lineDelimiter = null; String formatType = null; if (params != null) { String specifiedPartitions = params.get(LoadStmt.KEY_IN_PARAM_PARTITIONS); @@ -282,14 +282,25 @@ public class Load { if (columnSeparatorStr.isEmpty()) { columnSeparatorStr = "\t"; } - columnSeparator = new ColumnSeparator(columnSeparatorStr); + columnSeparator = new Separator(columnSeparatorStr); try { columnSeparator.analyze(); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } } - lineDelimiter = params.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER); + String lineDelimiterStr = params.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER); + if (lineDelimiterStr != null) { + if (lineDelimiterStr.isEmpty()) { + lineDelimiterStr = "\n"; + } + lineDelimiter = new Separator(lineDelimiterStr); + try { + lineDelimiter.analyze(); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + } formatType = params.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java index bb7de5c..f9053bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java @@ -18,7 +18,7 @@ package org.apache.doris.load; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnsStmt; import org.apache.doris.analysis.ImportWhereStmt; @@ -30,7 +30,8 @@ import org.apache.doris.load.loadv2.LoadTask; import com.google.common.base.Strings; public class RoutineLoadDesc { - private final ColumnSeparator columnSeparator; + private final Separator columnSeparator; + private final Separator lineDelimiter; private final ImportColumnsStmt columnsInfo; private final ImportWhereStmt precedingFilter; private final ImportWhereStmt wherePredicate; @@ -40,11 +41,12 @@ public class RoutineLoadDesc { private final PartitionNames partitionNames; private final String sequenceColName; - public RoutineLoadDesc(ColumnSeparator columnSeparator, ImportColumnsStmt columnsInfo, + public RoutineLoadDesc(Separator columnSeparator, Separator lineDelimiter, ImportColumnsStmt columnsInfo, ImportWhereStmt precedingFilter, ImportWhereStmt wherePredicate, PartitionNames partitionNames, Expr deleteCondition, LoadTask.MergeType mergeType, String sequenceColName) { this.columnSeparator = columnSeparator; + this.lineDelimiter = lineDelimiter; this.columnsInfo = columnsInfo; this.precedingFilter = precedingFilter; this.wherePredicate = wherePredicate; @@ -54,10 +56,14 @@ public class RoutineLoadDesc { this.sequenceColName = sequenceColName; } - public ColumnSeparator getColumnSeparator() { + public Separator getColumnSeparator() { return columnSeparator; } + public Separator getLineDelimiter() { + return lineDelimiter; + } + public ImportColumnsStmt getColumnsInfo() { return columnsInfo; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index de9ff6d..812c053 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -18,7 +18,7 @@ package org.apache.doris.load.routineload; import org.apache.doris.analysis.AlterRoutineLoadStmt; -import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; @@ -158,7 +158,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected List<ImportColumnDesc> columnDescs; // optional protected Expr precedingFilter; // optional protected Expr whereExpr; // optional - protected ColumnSeparator columnSeparator; // optional + protected Separator columnSeparator; // optional + protected Separator lineDelimiter; protected int desireTaskConcurrentNum; // optional protected JobState state = JobState.NEED_SCHEDULE; protected LoadDataSourceType dataSourceType; @@ -362,6 +363,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl if (routineLoadDesc.getColumnSeparator() != null) { columnSeparator = routineLoadDesc.getColumnSeparator(); } + if (routineLoadDesc.getLineDelimiter() != null) { + lineDelimiter = routineLoadDesc.getLineDelimiter(); + } if (routineLoadDesc.getPartitionNames() != null) { partitions = routineLoadDesc.getPartitionNames(); } @@ -485,10 +489,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl return whereExpr; } - public ColumnSeparator getColumnSeparator() { + public Separator getColumnSeparator() { return columnSeparator; } + public Separator getLineDelimiter() { + return lineDelimiter; + } + public boolean isStrictMode() { String value = jobProperties.get(LoadStmt.STRICT_MODE); if (value == null) { @@ -1350,6 +1358,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl jobProperties.put("dataFormat", "json"); } else { jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString()); + jobProperties.put("lineDelimiter", lineDelimiter == null ? "\n" : lineDelimiter.toString()); } jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum)); jobProperties.put("maxBatchIntervalS", String.valueOf(maxBatchIntervalS)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index c5d0b31..e3e7dc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -200,6 +200,10 @@ public class BrokerScanNode extends LoadScanNode { BrokerFileGroup fileGroup = context.fileGroup; params.setColumnSeparator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]); params.setLineDelimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]); + params.setColumnSeparatorStr(fileGroup.getValueSeparator()); + params.setLineDelimiterStr(fileGroup.getLineDelimiter()); + params.setColumnSeparatorLength(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8")).length); + params.setLineDelimiterLength(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8")).length); params.setStrictMode(strictMode); params.setProperties(brokerDesc.getProperties()); deleteCondition = fileGroup.getDeleteCondition(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 24d614b..112fa22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -144,12 +144,24 @@ public class StreamLoadScanNode extends LoadScanNode { createDefaultSmap(analyzer); if (taskInfo.getColumnSeparator() != null) { - String sep = taskInfo.getColumnSeparator().getColumnSeparator(); + String sep = taskInfo.getColumnSeparator().getSeparator(); + params.setColumnSeparatorStr(sep); + params.setColumnSeparatorLength(sep.getBytes(Charset.forName("UTF-8")).length); params.setColumnSeparator(sep.getBytes(Charset.forName("UTF-8"))[0]); } else { params.setColumnSeparator((byte) '\t'); + params.setColumnSeparatorLength(1); + params.setColumnSeparatorStr("\t"); + } + if (taskInfo.getLineDelimiter() != null) { + String sep = taskInfo.getLineDelimiter().getSeparator(); + params.setLineDelimiterStr(sep); + params.setLineDelimiterLength(sep.getBytes(Charset.forName("UTF-8")).length); + params.setLineDelimiter(sep.getBytes(Charset.forName("UTF-8"))[0]); + } else { + params.setLineDelimiter((byte) '\n'); + params.setLineDelimiterLength(1); } - params.setLineDelimiter((byte) '\n'); params.setDestTupleId(desc.getId().asInt()); brokerScanRange.setParams(params); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index cd42c98..7ddd39f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -19,7 +19,7 @@ package org.apache.doris.qe; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportWhereStmt; @@ -456,7 +456,7 @@ public class MultiLoadMgr { fileSizes.add(pair.second); }); } - ColumnSeparator columnSeparator = null; + Separator columnSeparator = null; PartitionNames partitionNames = null; String fileFormat = properties.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE); boolean isNegative = properties.get(LoadStmt.KEY_IN_PARAM_NEGATIVE) == null ? false : @@ -475,7 +475,7 @@ public class MultiLoadMgr { colString = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS); String columnSeparatorStr = properties.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR); if (columnSeparatorStr != null) { - columnSeparator = new ColumnSeparator(columnSeparatorStr); + columnSeparator = new Separator(columnSeparatorStr); try { columnSeparator.analyze(); } catch (AnalysisException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index 983a944..c6dd020 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -17,7 +17,7 @@ package org.apache.doris.task; -import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.PartitionNames; @@ -51,5 +51,6 @@ public interface LoadTaskInfo { public Expr getPrecedingFilter(); public Expr getWhereExpr(); - public ColumnSeparator getColumnSeparator(); + public Separator getColumnSeparator(); + public Separator getLineDelimiter(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index c79e5c7..57d07f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -17,7 +17,7 @@ package org.apache.doris.task; -import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.ImportColumnsStmt; @@ -62,7 +62,8 @@ public class StreamLoadTask implements LoadTaskInfo { // optional private List<ImportColumnDesc> columnExprDescs = Lists.newArrayList(); private Expr whereExpr; - private ColumnSeparator columnSeparator; + private Separator columnSeparator; + private Separator lineDelimiter; private PartitionNames partitions; private String path; private boolean negative; @@ -114,10 +115,14 @@ public class StreamLoadTask implements LoadTaskInfo { return whereExpr; } - public ColumnSeparator getColumnSeparator() { + public Separator getColumnSeparator() { return columnSeparator; } + public Separator getLineDelimiter() { + return lineDelimiter; + } + public PartitionNames getPartitions() { return partitions; } @@ -217,6 +222,9 @@ public class StreamLoadTask implements LoadTaskInfo { if (request.isSetColumnSeparator()) { setColumnSeparator(request.getColumnSeparator()); } + if (request.isSetLineDelimiter()) { + setLineDelimiter(request.getLineDelimiter()); + } if (request.isSetPartitions()) { String[] partNames = request.getPartitions().trim().split("\\s*,\\s*"); if (request.isSetIsTempPartition()) { @@ -331,10 +339,15 @@ public class StreamLoadTask implements LoadTaskInfo { } private void setColumnSeparator(String oriSeparator) throws AnalysisException { - columnSeparator = new ColumnSeparator(oriSeparator); + columnSeparator = new Separator(oriSeparator); columnSeparator.analyze(); } + private void setLineDelimiter(String oriLineDelimiter) throws AnalysisException { + lineDelimiter = new Separator(oriLineDelimiter); + lineDelimiter.analyze(); + } + @Override public long getMemLimit() { return execMemLimit; diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java index d964502..8c3a1fa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java @@ -94,7 +94,7 @@ public class CreateRoutineLoadStmtTest { List<String> partitionNameString = Lists.newArrayList(); partitionNameString.add("p1"); PartitionNames partitionNames = new PartitionNames(false, partitionNameString); - ColumnSeparator columnSeparator = new ColumnSeparator(","); + Separator columnSeparator = new Separator(","); // duplicate load property List<ParseNode> loadPropertyList = new ArrayList<>(); @@ -142,7 +142,7 @@ public class CreateRoutineLoadStmtTest { List<String> partitionNameString = Lists.newArrayList(); partitionNameString.add("p1"); PartitionNames partitionNames = new PartitionNames(false, partitionNameString); - ColumnSeparator columnSeparator = new ColumnSeparator(","); + Separator columnSeparator = new Separator(","); // duplicate load property TableName tableName = new TableName(dbName, tableNameString); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java index 9197c6b..03f21cc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java @@ -119,7 +119,7 @@ public class DataDescriptionTest { Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null); + Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null); desc.analyze("testDb"); Assert.assertEquals("MERGE DATA INFILE ('abc.txt') INTO TABLE testTable COLUMNS TERMINATED BY ',' (col1, col2) WHERE 1 = 1 DELETE ON 1 = 1", desc.toString()); Assert.assertEquals("1 = 1", desc.getWhereExpr().toSql()); @@ -127,7 +127,7 @@ public class DataDescriptionTest { Assert.assertEquals(",", desc.getColumnSeparator()); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"), - Lists.newArrayList("col1", "col2"), new ColumnSeparator("\t"), + Lists.newArrayList("col1", "col2"), new Separator("\t"), null, true, null); desc.analyze("testDb"); Assert.assertEquals("APPEND DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable" @@ -136,7 +136,7 @@ public class DataDescriptionTest { // hive \x01 column separator desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"), - Lists.newArrayList("col1", "col2"), new ColumnSeparator("\\x01"), + Lists.newArrayList("col1", "col2"), new Separator("\\x01"), null, true, null); desc.analyze("testDb"); Assert.assertEquals("APPEND DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable" @@ -220,7 +220,7 @@ public class DataDescriptionTest { Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null); + Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null); desc.analyze("testDb"); } @@ -311,7 +311,7 @@ public class DataDescriptionTest { @Test public void testAnalyzeSequenceColumnNormal() throws AnalysisException { DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("k1", "k2", "source_sequence", "v1"), new ColumnSeparator("\t"), + Lists.newArrayList("k1", "k2", "source_sequence", "v1"), new Separator("\t"), null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence"); new Expectations() { { @@ -330,7 +330,7 @@ public class DataDescriptionTest { @Test(expected = AnalysisException.class) public void testAnalyzeSequenceColumnWithoutSourceSequence() throws AnalysisException { DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("k1", "k2", "v1"), new ColumnSeparator("\t"), + Lists.newArrayList("k1", "k2", "v1"), new Separator("\t"), null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence"); new Expectations() { { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SeparatorTest.java similarity index 70% rename from fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java rename to fe/fe-core/src/test/java/org/apache/doris/analysis/SeparatorTest.java index 8426992..e499cc1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SeparatorTest.java @@ -22,47 +22,47 @@ import org.apache.doris.common.AnalysisException; import org.junit.Assert; import org.junit.Test; -public class ColumnSeparatorTest { +public class SeparatorTest { @Test public void testNormal() throws AnalysisException { // \t - ColumnSeparator separator = new ColumnSeparator("\t"); + Separator separator = new Separator("\t"); separator.analyze(); Assert.assertEquals("'\t'", separator.toSql()); - Assert.assertEquals("\t", separator.getColumnSeparator()); + Assert.assertEquals("\t", separator.getSeparator()); // \x01 - separator = new ColumnSeparator("\\x01"); + separator = new Separator("\\x01"); separator.analyze(); Assert.assertEquals("'\\x01'", separator.toSql()); - Assert.assertEquals("\1", separator.getColumnSeparator()); + Assert.assertEquals("\1", separator.getSeparator()); // \x00 \x01 - separator = new ColumnSeparator("\\x0001"); + separator = new Separator("\\x0001"); separator.analyze(); Assert.assertEquals("'\\x0001'", separator.toSql()); - Assert.assertEquals("\0\1", separator.getColumnSeparator()); + Assert.assertEquals("\0\1", separator.getSeparator()); - separator = new ColumnSeparator("|"); + separator = new Separator("|"); separator.analyze(); Assert.assertEquals("'|'", separator.toSql()); - Assert.assertEquals("|", separator.getColumnSeparator()); + Assert.assertEquals("|", separator.getSeparator()); - separator = new ColumnSeparator("\\|"); + separator = new Separator("\\|"); separator.analyze(); Assert.assertEquals("'\\|'", separator.toSql()); - Assert.assertEquals("\\|", separator.getColumnSeparator()); + Assert.assertEquals("\\|", separator.getSeparator()); } @Test(expected = AnalysisException.class) public void testHexFormatError() throws AnalysisException { - ColumnSeparator separator = new ColumnSeparator("\\x0g"); + Separator separator = new Separator("\\x0g"); separator.analyze(); } @Test(expected = AnalysisException.class) public void testHexLengthError() throws AnalysisException { - ColumnSeparator separator = new ColumnSeparator("\\x011"); + Separator separator = new Separator("\\x011"); separator.analyze(); } } \ No newline at end of file diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 5bbab3a..701eb80 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -17,7 +17,7 @@ package org.apache.doris.load.routineload; -import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.ImportSequenceStmt; import org.apache.doris.analysis.LabelName; @@ -80,7 +80,7 @@ public class KafkaRoutineLoadJobTest { private PartitionNames partitionNames; - private ColumnSeparator columnSeparator = new ColumnSeparator(","); + private Separator columnSeparator = new Separator(","); private ImportSequenceStmt sequenceStmt = new ImportSequenceStmt("source_sequence"); @@ -244,7 +244,7 @@ public class KafkaRoutineLoadJobTest { public void testFromCreateStmtWithErrorTable(@Mocked Catalog catalog, @Injectable Database database) throws LoadException { CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); - RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, + RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, null, partitionNames, null, LoadTask.MergeType.APPEND, null); Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); @@ -269,7 +269,7 @@ public class KafkaRoutineLoadJobTest { @Injectable Database database, @Injectable OlapTable table) throws UserException { CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); - RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, partitionNames, null, + RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, null, partitionNames, null, LoadTask.MergeType.APPEND, sequenceStmt.getSequenceColName()); Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); List<Pair<Integer, Long>> partitionIdToOffset = Lists.newArrayList(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index cd320f8..ea460e4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -17,7 +17,7 @@ package org.apache.doris.load.routineload; -import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.ParseNode; @@ -82,7 +82,7 @@ public class RoutineLoadManagerTest { String tableNameString = "table1"; TableName tableName = new TableName(dbName, tableNameString); List<ParseNode> loadPropertyList = new ArrayList<>(); - ColumnSeparator columnSeparator = new ColumnSeparator(","); + Separator columnSeparator = new Separator(","); loadPropertyList.add(columnSeparator); Map<String, String> properties = Maps.newHashMap(); properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); @@ -152,7 +152,7 @@ public class RoutineLoadManagerTest { String tableNameString = "table1"; TableName tableName = new TableName(dbName, tableNameString); List<ParseNode> loadPropertyList = new ArrayList<>(); - ColumnSeparator columnSeparator = new ColumnSeparator(","); + Separator columnSeparator = new Separator(","); loadPropertyList.add(columnSeparator); Map<String, String> properties = Maps.newHashMap(); properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 4e6cfd1..a6d7387 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -578,6 +578,7 @@ struct TStreamLoadPutRequest { 29: optional string sequence_col 30: optional bool num_as_string 31: optional bool fuzzy_parse + 32: optional string line_delimiter } struct TStreamLoadPutResult { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 58c09c8..96e1212 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -166,6 +166,12 @@ struct TBrokerScanRangeParams { // strictMode is a boolean // if strict mode is true, the incorrect data (the result of cast is null) will not be loaded 10: optional bool strict_mode + // for multibytes separators + 11: optional i32 column_separator_length = 1; + 12: optional i32 line_delimiter_length = 1; + 13: optional string column_separator_str; + 14: optional string line_delimiter_str; + } // Broker scan range --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org