This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bf51d5b82a1 [feature](move-memtable) support for inverted index file (#35891) bf51d5b82a1 is described below commit bf51d5b82a193d3515d730d673e99585e4867b24 Author: Sun Chenyang <csun5...@gmail.com> AuthorDate: Fri Jul 5 10:03:11 2024 +0800 [feature](move-memtable) support for inverted index file (#35891) support for inverted index file in move-memtable --- be/src/io/fs/stream_sink_file_writer.cpp | 12 +- be/src/io/fs/stream_sink_file_writer.h | 5 +- be/src/olap/delta_writer_v2.cpp | 1 + be/src/olap/rowset/beta_rowset_writer.cpp | 17 +- be/src/olap/rowset/beta_rowset_writer.h | 3 +- be/src/olap/rowset/beta_rowset_writer_v2.cpp | 6 +- be/src/olap/rowset/beta_rowset_writer_v2.h | 3 +- be/src/olap/rowset/rowset_writer.h | 4 +- be/src/olap/rowset/rowset_writer_context.h | 3 + be/src/olap/rowset/segment_creator.cpp | 40 +++-- be/src/olap/rowset/segment_creator.h | 10 +- .../segment_v2/inverted_index_file_writer.cpp | 12 +- .../rowset/segment_v2/inverted_index_file_writer.h | 9 +- .../segment_v2/inverted_index_fs_directory.cpp | 101 +++++++++++ .../segment_v2/inverted_index_fs_directory.h | 3 + be/src/olap/rowset/segment_v2/segment_writer.cpp | 11 +- be/src/olap/rowset/segment_v2/segment_writer.h | 4 +- .../rowset/segment_v2/vertical_segment_writer.cpp | 11 +- .../rowset/segment_v2/vertical_segment_writer.h | 3 +- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- be/src/runtime/load_stream.cpp | 14 +- be/src/runtime/load_stream.h | 2 +- be/src/runtime/load_stream_writer.cpp | 113 +++++++++---- be/src/runtime/load_stream_writer.h | 10 +- be/src/util/thrift_util.cpp | 9 +- be/src/util/thrift_util.h | 2 +- be/src/vec/sink/load_stream_stub.cpp | 3 +- be/src/vec/sink/load_stream_stub.h | 2 +- be/test/io/fs/stream_sink_file_writer_test.cpp | 3 +- .../org/apache/doris/planner/OlapTableSink.java | 1 + gensrc/proto/internal_service.proto | 2 + gensrc/proto/olap_common.proto | 5 + gensrc/thrift/Descriptors.thrift | 1 + .../data/inverted_index_p0/load/test_insert.out | 73 ++++++++ .../inverted_index_p0/load/test_stream_load.out | 45 +++++ .../test_index_lowercase_fault_injection.out | 0 .../test_stream_load_with_inverted_index.out | 43 +++++ ..._writer_v2_back_pressure_fault_injection.groovy | 3 + .../test_load_stream_fault_injection.groovy | 20 +-- .../inverted_index_p0/load/test_insert.groovy | 81 +++++++++ .../inverted_index_p0/load/test_spark_load.groovy | 174 +++++++++++++++++++ .../inverted_index_p0/load/test_stream_load.groovy | 150 +++++++++++++++++ .../test_index_lowercase_fault_injection.groovy | 2 +- .../test_stream_load_with_inverted_index.groovy | 185 +++++++++++++++++++++ .../test_insert_into_index.groovy | 75 +++++++++ .../load_p0/http_stream/test_http_stream.groovy | 8 +- .../load_p0/mysql_load/test_mysql_load.groovy | 4 +- .../mysql_load/test_mysql_load_big_file.groovy | 4 +- 48 files changed, 1187 insertions(+), 107 deletions(-) diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp index bca548be9a2..1d7f823af10 100644 --- a/be/src/io/fs/stream_sink_file_writer.cpp +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -28,15 +28,17 @@ namespace doris::io { void StreamSinkFileWriter::init(PUniqueId load_id, int64_t partition_id, int64_t index_id, - int64_t tablet_id, int32_t segment_id) { + int64_t tablet_id, int32_t segment_id, FileType file_type) { VLOG_DEBUG << "init stream writer, load id(" << UniqueId(load_id).to_string() << "), partition id(" << partition_id << "), index id(" << index_id - << "), tablet_id(" << tablet_id << "), segment_id(" << segment_id << ")"; + << "), tablet_id(" << tablet_id << "), segment_id(" << segment_id << ")" + << ", file_type(" << file_type << ")"; _load_id = load_id; _partition_id = partition_id; _index_id = index_id; _tablet_id = tablet_id; _segment_id = segment_id; + _file_type = file_type; } Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { @@ -47,7 +49,7 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id - << ", data_length: " << bytes_req; + << ", data_length: " << bytes_req << "file_type" << _file_type; std::span<const Slice> slices {data, data_cnt}; size_t stream_index = 0; @@ -67,7 +69,7 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { }); if (!skip_stream) { st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, - _bytes_appended, slices); + _bytes_appended, slices, false, _file_type); } DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { if (stream_index >= 2) { @@ -140,7 +142,7 @@ Status StreamSinkFileWriter::_finalize() { bool ok = false; for (auto& stream : _streams) { auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, - _bytes_appended, {}, true); + _bytes_appended, {}, true, _file_type); ok = ok || st.ok(); if (!st.ok()) { LOG(WARNING) << "failed to send segment eos to backend " << stream->dst_id() diff --git a/be/src/io/fs/stream_sink_file_writer.h b/be/src/io/fs/stream_sink_file_writer.h index 0d621e8b4c1..0950039077b 100644 --- a/be/src/io/fs/stream_sink_file_writer.h +++ b/be/src/io/fs/stream_sink_file_writer.h @@ -18,7 +18,7 @@ #pragma once #include <brpc/stream.h> -#include <gen_cpp/internal_service.pb.h> +#include <gen_cpp/olap_common.pb.h> #include <queue> @@ -40,7 +40,7 @@ public: : _streams(std::move(streams)) {} void init(PUniqueId load_id, int64_t partition_id, int64_t index_id, int64_t tablet_id, - int32_t segment_id); + int32_t segment_id, FileType file_type = FileType::SEGMENT_FILE); Status appendv(const Slice* data, size_t data_cnt) override; @@ -69,6 +69,7 @@ private: int32_t _segment_id; size_t _bytes_appended = 0; State _state {State::OPENED}; + FileType _file_type {FileType::SEGMENT_FILE}; }; } // namespace io diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 3f2f7bf99fa..805f072f6e6 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -123,6 +123,7 @@ Status DeltaWriterV2::init() { context.rowset_id = ExecEnv::GetInstance()->storage_engine().next_rowset_id(); context.data_dir = nullptr; context.partial_update_info = _partial_update_info; + context.memtable_on_sink_support_index_v2 = true; _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams); RETURN_IF_ERROR(_rowset_writer->init(context)); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index d418a89a361..17801ec16fd 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -831,10 +831,19 @@ Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path, return Status::OK(); } -Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, - io::FileWriterPtr& file_writer) { - auto path = _context.segment_path(segment_id); - return _create_file_writer(path, file_writer); +Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer, + FileType file_type) { + auto segment_path = _context.segment_path(segment_id); + if (file_type == FileType::INVERTED_INDEX_FILE) { + std::string prefix = + std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)}; + std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); + return _create_file_writer(index_path, file_writer); + } else if (file_type == FileType::SEGMENT_FILE) { + return _create_file_writer(segment_path, file_writer); + } + return Status::Error<ErrorCode::INTERNAL_ERROR>( + fmt::format("failed to create file = {}, file type = {}", segment_path, file_type)); } Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index d729a15df32..98bb43c6092 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -98,7 +98,8 @@ public: Status add_rowset(RowsetSharedPtr rowset) override; Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) override; - Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) override; + Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer, + FileType file_type = FileType::SEGMENT_FILE) override; Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat, TabletSchemaSPtr flush_schema) override; diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp index 1fee4e04034..3ebe331cfc1 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -69,14 +69,14 @@ Status BetaRowsetWriterV2::init(const RowsetWriterContext& rowset_writer_context return Status::OK(); } -Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer) { +Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer, + FileType file_type) { auto partition_id = _context.partition_id; auto index_id = _context.index_id; auto tablet_id = _context.tablet_id; auto load_id = _context.load_id; - auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(_streams); - stream_writer->init(load_id, partition_id, index_id, tablet_id, segment_id); + stream_writer->init(load_id, partition_id, index_id, tablet_id, segment_id, file_type); file_writer = std::move(stream_writer); return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index e406a2037a7..89bd3045089 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -80,7 +80,8 @@ public: "add_rowset_for_linked_schema_change is not implemented"); } - Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) override; + Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer, + FileType file_type = FileType::SEGMENT_FILE) override; Status flush() override { return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>("flush is not implemented"); diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 75a592cf98d..6861b8ab7e2 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -17,6 +17,7 @@ #pragma once +#include <gen_cpp/internal_service.pb.h> #include <gen_cpp/olap_file.pb.h> #include <gen_cpp/types.pb.h> @@ -89,7 +90,8 @@ public: // Precondition: the input `rowset` should have the same type of the rowset we're building virtual Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) = 0; - virtual Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) { + virtual Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer, + FileType file_type = FileType::SEGMENT_FILE) { return Status::NotSupported("RowsetWriter does not support create_file_writer"); } diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index 488030993e1..0130916bfb4 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -88,6 +88,9 @@ struct RowsetWriterContext { std::shared_ptr<FileWriterCreator> file_writer_creator; std::shared_ptr<SegmentCollector> segment_collector; + // memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter to send inverted index file + bool memtable_on_sink_support_index_v2 = false; + /// begin file cache opts bool write_file_cache = false; bool is_hot_data = false; diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 738c6e2f9f9..82313f988cb 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -134,8 +134,17 @@ Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWrit Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer, int32_t segment_id, bool no_compression) { - io::FileWriterPtr file_writer; - RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, file_writer)); + io::FileWriterPtr segment_file_writer; + RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer)); + + io::FileWriterPtr inverted_file_writer; + if (_context.tablet_schema->has_inverted_index() && + _context.tablet_schema->get_inverted_index_storage_format() >= + InvertedIndexStorageFormatPB::V2 && + _context.memtable_on_sink_support_index_v2) { + RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, inverted_file_writer, + FileType::INVERTED_INDEX_FILE)); + } segment_v2::SegmentWriterOptions writer_options; writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; @@ -146,9 +155,10 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen } writer = std::make_unique<segment_v2::SegmentWriter>( - file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, - _context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context); - RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer))); + segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, + _context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context, + std::move(inverted_file_writer)); + RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer))); auto s = writer->init(); if (!s.ok()) { LOG(WARNING) << "failed to init segment writer: " << s.to_string(); @@ -161,8 +171,17 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen Status SegmentFlusher::_create_segment_writer( std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t segment_id, bool no_compression) { - io::FileWriterPtr file_writer; - RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, file_writer)); + io::FileWriterPtr segment_file_writer; + RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer)); + + io::FileWriterPtr inverted_file_writer; + if (_context.tablet_schema->has_inverted_index() && + _context.tablet_schema->get_inverted_index_storage_format() >= + InvertedIndexStorageFormatPB::V2 && + _context.memtable_on_sink_support_index_v2) { + RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, inverted_file_writer, + FileType::INVERTED_INDEX_FILE)); + } segment_v2::VerticalSegmentWriterOptions writer_options; writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; @@ -173,9 +192,10 @@ Status SegmentFlusher::_create_segment_writer( } writer = std::make_unique<segment_v2::VerticalSegmentWriter>( - file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, - _context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context); - RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer))); + segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, + _context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context, + std::move(inverted_file_writer)); + RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer))); auto s = writer->init(); if (!s.ok()) { LOG(WARNING) << "failed to init segment writer: " << s.to_string(); diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index 3226ab0adf8..97a8f177ad9 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -17,9 +17,11 @@ #pragma once +#include <gen_cpp/internal_service.pb.h> #include <gen_cpp/olap_file.pb.h> #include <string> +#include <typeinfo> #include <unordered_map> #include <vector> @@ -49,7 +51,8 @@ class FileWriterCreator { public: virtual ~FileWriterCreator() = default; - virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer) = 0; + virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer, + FileType file_type = FileType::SEGMENT_FILE) = 0; }; template <class T> @@ -57,8 +60,9 @@ class FileWriterCreatorT : public FileWriterCreator { public: explicit FileWriterCreatorT(T* t) : _t(t) {} - Status create(uint32_t segment_id, io::FileWriterPtr& file_writer) override { - return _t->create_file_writer(segment_id, file_writer); + Status create(uint32_t segment_id, io::FileWriterPtr& file_writer, + FileType file_type = FileType::SEGMENT_FILE) override { + return _t->create_file_writer(segment_id, file_writer, file_type); } private: diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index cdd26fecf87..22d494e5132 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -341,9 +341,15 @@ size_t InvertedIndexFileWriter::write_v2() { io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)}; auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, index_path.parent_path().c_str()); - - auto compound_file_output = std::unique_ptr<lucene::store::IndexOutput>( - out_dir->createOutput(index_path.filename().c_str())); + std::unique_ptr<lucene::store::IndexOutput> compound_file_output; + // idx v2 writer != nullptr means memtable on sink node now + if (_idx_v2_writer != nullptr) { + compound_file_output = std::unique_ptr<lucene::store::IndexOutput>( + out_dir->createOutputV2(_idx_v2_writer.get())); + } else { + compound_file_output = std::unique_ptr<lucene::store::IndexOutput>( + out_dir->createOutput(index_path.filename().c_str())); + } // Write the version number compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h index 7ec71c0b38a..0d82504c07f 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h @@ -26,6 +26,7 @@ #include <vector> #include "io/fs/file_system.h" +#include "io/fs/file_writer.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" namespace doris { @@ -46,12 +47,14 @@ class InvertedIndexFileWriter { public: InvertedIndexFileWriter(io::FileSystemSPtr fs, std::string index_path_prefix, std::string rowset_id, int64_t seg_id, - InvertedIndexStorageFormatPB storage_format) + InvertedIndexStorageFormatPB storage_format, + io::FileWriterPtr file_writer = nullptr) : _fs(std::move(fs)), _index_path_prefix(std::move(index_path_prefix)), _rowset_id(std::move(rowset_id)), _seg_id(seg_id), - _storage_format(storage_format) {} + _storage_format(storage_format), + _idx_v2_writer(std::move(file_writer)) {} Result<DorisFSDirectory*> open(const TabletIndex* index_meta); Status delete_index(const TabletIndex* index_meta); @@ -76,6 +79,8 @@ private: int64_t _seg_id; InvertedIndexStorageFormatPB _storage_format; size_t _file_size = 0; + // write to disk or stream + io::FileWriterPtr _idx_v2_writer; }; } // namespace segment_v2 } // namespace doris \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index 54d484d1199..0443bf345ba 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -98,6 +98,21 @@ public: int64_t length() const override; }; +class DorisFSDirectory::FSIndexOutputV2 : public lucene::store::BufferedIndexOutput { +private: + io::FileWriter* _index_v2_file_writer = nullptr; + +protected: + void flushBuffer(const uint8_t* b, const int32_t size) override; + +public: + FSIndexOutputV2() = default; + void init(io::FileWriter* file_writer); + ~FSIndexOutputV2() override; + void close() override; + int64_t length() const override; +}; + bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const char* path, IndexInput*& ret, CLuceneError& error, int32_t buffer_size) { @@ -333,6 +348,86 @@ int64_t DorisFSDirectory::FSIndexOutput::length() const { return _writer->bytes_appended(); } +void DorisFSDirectory::FSIndexOutputV2::init(io::FileWriter* file_writer) { + _index_v2_file_writer = file_writer; + DBUG_EXECUTE_IF( + "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_" + "init", + { + _CLTHROWA(CL_ERR_IO, + "debug point: test throw error in fsindexoutput init mock error"); + }) +} + +DorisFSDirectory::FSIndexOutputV2::~FSIndexOutputV2() {} + +void DorisFSDirectory::FSIndexOutputV2::flushBuffer(const uint8_t* b, const int32_t size) { + if (_index_v2_file_writer != nullptr && b != nullptr && size > 0) { + Slice data {b, (size_t)size}; + DBUG_EXECUTE_IF( + "DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_" + "flushBuffer", + { return; }) + Status st = _index_v2_file_writer->append(data); + DBUG_EXECUTE_IF( + "DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer", { + st = Status::Error<doris::ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "flush buffer mock error"); + }) + if (!st.ok()) { + LOG(WARNING) << "File IO Write error: " << st.to_string(); + _CLTHROWA(CL_ERR_IO, "writer append data when flushBuffer error"); + } + } else { + if (_index_v2_file_writer == nullptr) { + LOG(WARNING) << "File writer is nullptr in DorisFSDirectory::FSIndexOutputV2, " + "ignore flush."; + _CLTHROWA(CL_ERR_IO, "flushBuffer error, _index_v2_file_writer = nullptr"); + } else if (b == nullptr) { + LOG(WARNING) << "buffer is nullptr when flushBuffer in " + "DorisFSDirectory::FSIndexOutput"; + } + } +} + +void DorisFSDirectory::FSIndexOutputV2::close() { + try { + BufferedIndexOutput::close(); + DBUG_EXECUTE_IF( + "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_" + "close", + { + _CLTHROWA(CL_ERR_IO, + "debug point: test throw error in bufferedindexoutput close"); + }) + } catch (CLuceneError& err) { + LOG(WARNING) << "FSIndexOutputV2 close, BufferedIndexOutput close error: " << err.what(); + if (err.number() == CL_ERR_IO) { + LOG(WARNING) << "FSIndexOutputV2 close, BufferedIndexOutput close IO error: " + << err.what(); + } + _CLTHROWA(err.number(), err.what()); + } + if (_index_v2_file_writer) { + auto ret = _index_v2_file_writer->close(); + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error", + { ret = Status::Error<INTERNAL_ERROR>("writer close status error"); }) + if (!ret.ok()) { + LOG(WARNING) << "FSIndexOutputV2 close, stream sink file writer close error: " + << ret.to_string(); + _CLTHROWA(CL_ERR_IO, ret.to_string().c_str()); + } + } else { + LOG(WARNING) << "File writer is nullptr, ignore finalize and close."; + _CLTHROWA(CL_ERR_IO, "close file writer error, _index_v2_file_writer = nullptr"); + } +} + +int64_t DorisFSDirectory::FSIndexOutputV2::length() const { + CND_PRECONDITION(_index_v2_file_writer != nullptr, "file is not open"); + return _index_v2_file_writer->bytes_appended(); +} + DorisFSDirectory::DorisFSDirectory() { filemode = 0644; this->lockFactory = nullptr; @@ -495,6 +590,12 @@ lucene::store::IndexOutput* DorisFSDirectory::createOutput(const char* name) { return ret; } +lucene::store::IndexOutput* DorisFSDirectory::createOutputV2(io::FileWriter* file_writer) { + auto* ret = _CLNEW FSIndexOutputV2(); + ret->init(file_writer); + return ret; +} + std::string DorisFSDirectory::toString() const { return std::string("DorisFSDirectory@") + this->directory; } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h index d9069d66ef2..b3e0352d7ad 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h @@ -60,9 +60,11 @@ protected: public: class FSIndexOutput; + class FSIndexOutputV2; class FSIndexInput; friend class DorisFSDirectory::FSIndexOutput; + friend class DorisFSDirectory::FSIndexOutputV2; friend class DorisFSDirectory::FSIndexInput; const io::FileSystemSPtr& getFileSystem() { return _fs; } @@ -78,6 +80,7 @@ public: void renameFile(const char* from, const char* to) override; void touchFile(const char* name) override; lucene::store::IndexOutput* createOutput(const char* name) override; + lucene::store::IndexOutput* createOutputV2(io::FileWriter* file_writer); void close() override; std::string toString() const override; static const char* getClassName(); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 729e2500384..d22e1060dd3 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -43,8 +43,9 @@ #include "olap/key_coder.h" #include "olap/olap_common.h" #include "olap/primary_key_index.h" -#include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep -#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext +#include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep +#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext +#include "olap/rowset/segment_creator.h" #include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter #include "olap/rowset/segment_v2/inverted_index_file_writer.h" #include "olap/rowset/segment_v2/inverted_index_writer.h" @@ -85,7 +86,8 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet, DataDir* data_dir, uint32_t max_row_per_segment, const SegmentWriterOptions& opts, - std::shared_ptr<MowContext> mow_context) + std::shared_ptr<MowContext> mow_context, + io::FileWriterPtr inverted_file_writer) : _segment_id(segment_id), _tablet_schema(std::move(tablet_schema)), _tablet(std::move(tablet)), @@ -140,7 +142,8 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, std::string {InvertedIndexDescriptor::get_index_file_path_prefix( file_writer->path().c_str())}, _opts.rowset_ctx->rowset_id.to_string(), segment_id, - _tablet_schema->get_inverted_index_storage_format()); + _tablet_schema->get_inverted_index_storage_format(), + std::move(inverted_file_writer)); } } diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 92af12d4da6..9c667ee92fc 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -85,7 +85,8 @@ public: explicit SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet, DataDir* data_dir, uint32_t max_row_per_segment, const SegmentWriterOptions& opts, - std::shared_ptr<MowContext> mow_context); + std::shared_ptr<MowContext> mow_context, + io::FileWriterPtr inverted_file_writer = nullptr); ~SegmentWriter(); Status init(); @@ -197,7 +198,6 @@ private: // Not owned. owned by RowsetWriter or SegmentFlusher io::FileWriter* _file_writer = nullptr; std::unique_ptr<InvertedIndexFileWriter> _inverted_index_file_writer; - SegmentFooterPB _footer; size_t _num_key_columns; size_t _num_short_key_columns; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 0930325d6d8..5d2d6ac0769 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -42,8 +42,9 @@ #include "olap/olap_common.h" #include "olap/partial_update_info.h" #include "olap/primary_key_index.h" -#include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep -#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext +#include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep +#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext +#include "olap/rowset/segment_creator.h" #include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/rowset/segment_v2/inverted_index_file_writer.h" @@ -83,7 +84,8 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet, DataDir* data_dir, uint32_t max_row_per_segment, const VerticalSegmentWriterOptions& opts, - std::shared_ptr<MowContext> mow_context) + std::shared_ptr<MowContext> mow_context, + io::FileWriterPtr inverted_file_writer) : _segment_id(segment_id), _tablet_schema(std::move(tablet_schema)), _tablet(std::move(tablet)), @@ -114,7 +116,8 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 std::string {InvertedIndexDescriptor::get_index_file_path_prefix( _opts.rowset_ctx->segment_path(segment_id))}, _opts.rowset_ctx->rowset_id.to_string(), segment_id, - _tablet_schema->get_inverted_index_storage_format()); + _tablet_schema->get_inverted_index_storage_format(), + std::move(inverted_file_writer)); } } diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 3809a8301d5..8068b3e44be 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -82,7 +82,8 @@ public: TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet, DataDir* data_dir, uint32_t max_row_per_segment, const VerticalSegmentWriterOptions& opts, - std::shared_ptr<MowContext> mow_context); + std::shared_ptr<MowContext> mow_context, + io::FileWriterPtr inverted_file_writer = nullptr); ~VerticalSegmentWriter(); VerticalSegmentWriter(const VerticalSegmentWriter&) = delete; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 6980f0be3f2..8138c7594b8 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -962,7 +962,7 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: case TDataSinkType::OLAP_TABLE_SINK: { if (state->query_options().enable_memtable_on_sink_node && - !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) && + !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) { _sink.reset(new OlapTableSinkV2OperatorX(pool, next_sink_operator_id(), row_desc, output_exprs)); diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index fc8a6cd1b61..0a35bf6008e 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -136,15 +136,23 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data // Each sender sends data in one segment sequential, so we also do not // need a lock here. bool eos = header.segment_eos(); + FileType file_type = header.file_type(); uint32_t new_segid = mapping->at(segid); DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); butil::IOBuf buf = data->movable(); - auto flush_func = [this, new_segid, eos, buf, header]() { + auto flush_func = [this, new_segid, eos, buf, header, file_type]() { signal::set_signal_task_id(_load_id); g_load_stream_flush_running_threads << -1; - auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf); + auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); if (eos && st.ok()) { - st = _load_stream_writer->close_segment(new_segid); + if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { + st = _load_stream_writer->close_writer(new_segid, file_type); + } else { + st = Status::InternalError( + "appent data failed, file type error, file type = {}, " + "segment_id={}", + file_type, new_segid); + } } if (!st.ok() && _failed_st->ok()) { _failed_st = std::make_shared<Status>(st); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index b2635698379..9e6e0e36a4b 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -18,7 +18,7 @@ #pragma once #include <bthread/mutex.h> -#include <gen_cpp/internal_service.pb.h> +#include <gen_cpp/olap_common.pb.h> #include <condition_variable> #include <memory> diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index bc02be98bc4..925229a43ce 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -94,28 +94,31 @@ Status LoadStreamWriter::init() { return Status::OK(); } -Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf) { +Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf, + FileType file_type) { SCOPED_ATTACH_TASK(_query_thread_context); io::FileWriter* file_writer = nullptr; + auto& file_writers = + file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers; { std::lock_guard lock_guard(_lock); DCHECK(_is_init); - if (segid >= _segment_file_writers.size()) { - for (size_t i = _segment_file_writers.size(); i <= segid; i++) { + if (segid >= file_writers.size()) { + for (size_t i = file_writers.size(); i <= segid; i++) { Status st; io::FileWriterPtr file_writer; - st = _rowset_writer->create_file_writer(i, file_writer); + st = _rowset_writer->create_file_writer(i, file_writer, file_type); if (!st.ok()) { _is_canceled = true; return st; } - _segment_file_writers.push_back(std::move(file_writer)); + file_writers.push_back(std::move(file_writer)); g_load_stream_file_writer_cnt << 1; } } // TODO: IOBuf to Slice - file_writer = _segment_file_writers[segid].get(); + file_writer = file_writers[segid].get(); } DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { file_writer = nullptr; }); VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid; @@ -130,25 +133,32 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOB return file_writer->append(buf.to_string()); } -Status LoadStreamWriter::close_segment(uint32_t segid) { +Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) { SCOPED_ATTACH_TASK(_query_thread_context); io::FileWriter* file_writer = nullptr; + auto& file_writers = + file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers; { std::lock_guard lock_guard(_lock); - DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.uninited_writer", { _is_init = false; }); + DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.uninited_writer", { _is_init = false; }); if (!_is_init) { - return Status::Corruption("close_segment failed, LoadStreamWriter is not inited"); + return Status::Corruption("close_writer failed, LoadStreamWriter is not inited"); } - DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.bad_segid", - { segid = _segment_file_writers.size(); }); - if (segid >= _segment_file_writers.size()) { - return Status::Corruption("close_segment failed, segment {} is never opened", segid); + DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid", + { segid = file_writers.size(); }); + if (segid >= file_writers.size()) { + return Status::Corruption( + "close_writer failed, file {} is never opened, file type is {}", segid, + file_type); } - file_writer = _segment_file_writers[segid].get(); + file_writer = file_writers[segid].get(); } - DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.null_file_writer", { file_writer = nullptr; }); + + DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", { file_writer = nullptr; }); if (file_writer == nullptr) { - return Status::Corruption("close_segment failed, file writer {} is destoryed", segid); + return Status::Corruption( + "close_writer failed, file writer {} is destoryed, fiel type is {}", segid, + file_type); } auto st = file_writer->close(); if (!st.ok()) { @@ -156,10 +166,12 @@ Status LoadStreamWriter::close_segment(uint32_t segid) { return st; } g_load_stream_file_writer_cnt << -1; - LOG(INFO) << "segment " << segid << " path " << file_writer->path().native() - << "closed, written " << file_writer->bytes_appended() << " bytes"; + LOG(INFO) << "file " << segid << " path " << file_writer->path().native() << "closed, written " + << file_writer->bytes_appended() << " bytes" + << ", file type is " << file_type; if (file_writer->bytes_appended() == 0) { - return Status::Corruption("segment {} closed with 0 bytes", file_writer->path().native()); + return Status::Corruption("file {} closed with 0 bytes, file type is {}", + file_writer->path().native(), file_type); } return Status::OK(); } @@ -167,35 +179,62 @@ Status LoadStreamWriter::close_segment(uint32_t segid) { Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_schema) { SCOPED_ATTACH_TASK(_query_thread_context); - io::FileWriter* file_writer = nullptr; + size_t segment_file_size = 0; + size_t inverted_file_size = 0; { std::lock_guard lock_guard(_lock); DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.uninited_writer", { _is_init = false; }); if (!_is_init) { return Status::Corruption("add_segment failed, LoadStreamWriter is not inited"); } + if (_inverted_file_writers.size() > 0 && + _inverted_file_writers.size() != _segment_file_writers.size()) { + return Status::Corruption( + "add_segment failed, inverted file writer size is {}," + "segment file writer size is {}", + _inverted_file_writers.size(), _segment_file_writers.size()); + } DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid", { segid = _segment_file_writers.size(); }); - if (segid >= _segment_file_writers.size()) { - return Status::Corruption("add_segment failed, segment {} is never opened", segid); + RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, &segment_file_size)); + if (_inverted_file_writers.size() > 0) { + RETURN_IF_ERROR( + _calc_file_size(segid, FileType::INVERTED_INDEX_FILE, &inverted_file_size)); } - file_writer = _segment_file_writers[segid].get(); } - DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.null_file_writer", { file_writer = nullptr; }); + + if (segment_file_size + inverted_file_size != stat.data_size) { + return Status::Corruption( + "add_segment failed, segment stat {} does not match, file size={}, inverted file " + "size={}, stat.data_size={}, tablet id={}", + segid, segment_file_size, inverted_file_size, stat.data_size, _req.tablet_id); + } + + return _rowset_writer->add_segment(segid, stat, flush_schema); +} + +Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, size_t* file_size) { + io::FileWriter* file_writer = nullptr; + auto& file_writers = + (file_type == FileType::SEGMENT_FILE) ? _segment_file_writers : _inverted_file_writers; + + if (segid >= file_writers.size()) { + return Status::Corruption("calc file size failed, file {} is never opened, file type is {}", + segid, file_type); + } + file_writer = file_writers[segid].get(); + DBUG_EXECUTE_IF("LoadStreamWriter.calc_file_size.null_file_writer", { file_writer = nullptr; }); if (file_writer == nullptr) { - return Status::Corruption("add_segment failed, file writer {} is destoryed", segid); + return Status::Corruption( + "calc file size failed, file writer {} is destoryed, file type is {}", segid, + file_type); } if (file_writer->state() != io::FileWriter::State::CLOSED) { - return Status::Corruption("add_segment failed, segment {} is not closed", + return Status::Corruption("calc file size failed, file {} is not closed", file_writer->path().native()); } - if (file_writer->bytes_appended() != stat.data_size) { - return Status::Corruption( - "add_segment failed, segment stat {} does not match, file size={}, " - "stat.data_size={}", - file_writer->path().native(), file_writer->bytes_appended(), stat.data_size); - } - return _rowset_writer->add_segment(segid, stat, flush_schema); + *file_size = file_writer->bytes_appended(); + return Status::OK(); } Status LoadStreamWriter::close() { @@ -224,6 +263,14 @@ Status LoadStreamWriter::close() { } } + for (const auto& writer : _inverted_file_writers) { + if (writer->state() != io::FileWriter::State::CLOSED) { + return Status::Corruption( + "LoadStreamWriter close failed, inverted file {} is not closed", + writer->path().native()); + } + } + RETURN_IF_ERROR(_rowset_builder->build_rowset()); RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task()); RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap()); diff --git a/be/src/runtime/load_stream_writer.h b/be/src/runtime/load_stream_writer.h index 9e3fce3c7db..b22817cb85c 100644 --- a/be/src/runtime/load_stream_writer.h +++ b/be/src/runtime/load_stream_writer.h @@ -17,6 +17,8 @@ #pragma once +#include <gen_cpp/internal_service.pb.h> + #include <atomic> #include <memory> #include <mutex> @@ -61,12 +63,15 @@ public: Status init(); - Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf); + Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf, + FileType file_type = FileType::SEGMENT_FILE); - Status close_segment(uint32_t segid); + Status close_writer(uint32_t segid, FileType file_type); Status add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_chema); + Status _calc_file_size(uint32_t segid, FileType file_type, size_t* file_size); + // wait for all memtables to be flushed. Status close(); @@ -81,6 +86,7 @@ private: std::unordered_map<uint32_t /*segid*/, SegmentStatisticsSharedPtr> _segment_stat_map; std::mutex _segment_stat_map_lock; std::vector<io::FileWriterPtr> _segment_file_writers; + std::vector<io::FileWriterPtr> _inverted_file_writers; QueryThreadContext _query_thread_context; }; diff --git a/be/src/util/thrift_util.cpp b/be/src/util/thrift_util.cpp index 395c01ec390..2efb012aa20 100644 --- a/be/src/util/thrift_util.cpp +++ b/be/src/util/thrift_util.cpp @@ -156,7 +156,7 @@ std::string to_string(const TUniqueId& id) { return std::to_string(id.hi).append(std::to_string(id.lo)); } -bool _has_inverted_index_or_partial_update(TOlapTableSink sink) { +bool _has_inverted_index_v1_or_partial_update(TOlapTableSink sink) { OlapTableSchemaParam schema; if (!schema.init(sink.schema).ok()) { return false; @@ -167,7 +167,12 @@ bool _has_inverted_index_or_partial_update(TOlapTableSink sink) { for (const auto& index_schema : schema.indexes()) { for (const auto& index : index_schema->indexes) { if (index->index_type() == INVERTED) { - return true; + if (sink.schema.inverted_index_file_storage_format == + TInvertedIndexFileStorageFormat::V1) { + return true; + } else { + return false; + } } } } diff --git a/be/src/util/thrift_util.h b/be/src/util/thrift_util.h index 9f4792ff64b..a7d6620d5d3 100644 --- a/be/src/util/thrift_util.h +++ b/be/src/util/thrift_util.h @@ -177,6 +177,6 @@ bool t_network_address_comparator(const TNetworkAddress& a, const TNetworkAddres PURE std::string to_string(const TUniqueId& id); -PURE bool _has_inverted_index_or_partial_update(TOlapTableSink sink); +PURE bool _has_inverted_index_v1_or_partial_update(TOlapTableSink sink); } // namespace doris diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index caebb381db6..63f91678989 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -206,7 +206,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, // APPEND_DATA Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, uint64_t offset, std::span<const Slice> data, - bool segment_eos) { + bool segment_eos, FileType file_type) { PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -217,6 +217,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 header.set_segment_eos(segment_eos); header.set_offset(offset); header.set_opcode(doris::PStreamHeader::APPEND_DATA); + header.set_file_type(file_type); return _encode_and_send(header, data); } diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index a7d34ff8569..dd15eb7bf4c 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -137,7 +137,7 @@ public: Status append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, uint64_t offset, std::span<const Slice> data, - bool segment_eos = false); + bool segment_eos = false, FileType file_type = FileType::SEGMENT_FILE); // ADD_SEGMENT Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp b/be/test/io/fs/stream_sink_file_writer_test.cpp index b9b0e0818cf..69f286b205b 100644 --- a/be/test/io/fs/stream_sink_file_writer_test.cpp +++ b/be/test/io/fs/stream_sink_file_writer_test.cpp @@ -60,7 +60,8 @@ class StreamSinkFileWriterTest : public testing::Test { // APPEND_DATA virtual Status append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, uint64_t offset, std::span<const Slice> data, - bool segment_eos = false) override { + bool segment_eos = false, + FileType file_type = FileType::SEGMENT_FILE) override { EXPECT_EQ(PARTITION_ID, partition_id); EXPECT_EQ(INDEX_ID, index_id); EXPECT_EQ(TABLET_ID, tablet_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 8c40e467338..996f9d2fc1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -327,6 +327,7 @@ public class OlapTableSink extends DataSink { } } } + schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat()); return schemaParam; } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 9d8a72e01cc..4457c50917b 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -24,6 +24,7 @@ option java_package = "org.apache.doris.proto"; import "data.proto"; import "descriptors.proto"; import "types.proto"; +import "olap_common.proto"; import "olap_file.proto"; option cc_generic_services = true; @@ -909,6 +910,7 @@ message PStreamHeader { repeated PTabletID tablets = 10; optional TabletSchemaPB flush_schema = 11; optional uint64 offset = 12; + optional FileType file_type = 13; } message PGetWalQueueSizeRequest{ diff --git a/gensrc/proto/olap_common.proto b/gensrc/proto/olap_common.proto index a452e0ff6a6..e60aa7603fc 100644 --- a/gensrc/proto/olap_common.proto +++ b/gensrc/proto/olap_common.proto @@ -58,3 +58,8 @@ message PTopNCounter { required uint32 space_expand_rate = 2; repeated PCounter counter = 3; } + +enum FileType { + SEGMENT_FILE = 1; + INVERTED_INDEX_FILE = 2; +} \ No newline at end of file diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 2b8a74afd66..cb844c93361 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -249,6 +249,7 @@ struct TOlapTableSchemaParam { 10: optional bool is_strict_mode = false 11: optional string auto_increment_column 12: optional i32 auto_increment_column_unique_id = -1 + 13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1 } struct TTabletLocation { diff --git a/regression-test/data/inverted_index_p0/load/test_insert.out b/regression-test/data/inverted_index_p0/load/test_insert.out new file mode 100644 index 00000000000..b8f7f12afbc --- /dev/null +++ b/regression-test/data/inverted_index_p0/load/test_insert.out @@ -0,0 +1,73 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_2 -- +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_3 -- +2 {"a":18811,"b":"hello world","c":1181111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_2 -- +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_3 -- +2 {"a":18811,"b":"hello world","c":1181111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_2 -- +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_3 -- +2 {"a":18811,"b":"hello world","c":1181111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_2 -- +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_3 -- +2 {"a":18811,"b":"hello world","c":1181111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_2 -- +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_3 -- +2 {"a":18811,"b":"hello world","c":1181111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_2 -- +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_3 -- +2 {"a":18811,"b":"hello world","c":1181111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_2 -- +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_3 -- +2 {"a":18811,"b":"hello world","c":1181111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_2 -- +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_3 -- +2 {"a":18811,"b":"hello world","c":1181111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + diff --git a/regression-test/data/inverted_index_p0/load/test_stream_load.out b/regression-test/data/inverted_index_p0/load/test_stream_load.out new file mode 100644 index 00000000000..5723e4218d2 --- /dev/null +++ b/regression-test/data/inverted_index_p0/load/test_stream_load.out @@ -0,0 +1,45 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_1 -- +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot + +-- !sql_1 -- +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot + diff --git a/regression-test/data/fault_injection_p0/test_index_lowercase_fault_injection.out b/regression-test/data/inverted_index_p0/test_index_lowercase_fault_injection.out similarity index 100% rename from regression-test/data/fault_injection_p0/test_index_lowercase_fault_injection.out rename to regression-test/data/inverted_index_p0/test_index_lowercase_fault_injection.out diff --git a/regression-test/data/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.out b/regression-test/data/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.out new file mode 100644 index 00000000000..5c6d903a9b4 --- /dev/null +++ b/regression-test/data/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.out @@ -0,0 +1,43 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_1 -- +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot + +-- !sql_2 -- +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_3 -- +2 {"a":18811,"b":"hello world","c":1181111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_1 -- +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot +davidjhulse/davesbingrewardsbot + +-- !sql_2 -- +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + +-- !sql_3 -- +2 {"a":18811,"b":"hello world","c":1181111} +4 {"a":1234,"b":"hello xxx world","c":8181111} + diff --git a/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy index ea9e9ffb8bb..fb04b128822 100644 --- a/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy @@ -96,8 +96,11 @@ suite("test_delta_writer_v2_back_pressure_fault_injection", "nonConcurrent") { logger.info(res.toString()) } } + } catch(Exception e) { logger.info(e.getMessage()) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure") } sql """ DROP TABLE IF EXISTS `baseall` """ diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy index 65e68f3d3fa..6a6aa0efd43 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy @@ -143,22 +143,22 @@ suite("load_stream_fault_injection", "nonConcurrent") { load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", "") // LoadStreamWriter append_data meet null file writer error load_with_injection("LoadStreamWriter.append_data.null_file_writer", "") - // LoadStreamWriter close_segment meet not inited error - load_with_injection("LoadStreamWriter.close_segment.uninited_writer", "") - // LoadStreamWriter close_segment meet not bad segid error - load_with_injection("LoadStreamWriter.close_segment.bad_segid", "") - // LoadStreamWriter close_segment meet null file writer error - load_with_injection("LoadStreamWriter.close_segment.null_file_writer", "") - // LoadStreamWriter close_segment meet file writer failed to close error + // LoadStreamWriter close_writer meet not inited error + load_with_injection("LoadStreamWriter.close_writer.uninited_writer", "") + // LoadStreamWriter close_writer meet not bad segid error + load_with_injection("LoadStreamWriter.close_writer.bad_segid", "") + // LoadStreamWriter close_writer meet null file writer error + load_with_injection("LoadStreamWriter.close_writer.null_file_writer", "") + // LoadStreamWriter close_writer meet file writer failed to close error load_with_injection("LocalFileWriter.close.failed", "") - // LoadStreamWriter close_segment meet bytes_appended and real file size not match error - load_with_injection("FileWriter.close_segment.zero_bytes_appended", "") + // LoadStreamWriter close_writer meet bytes_appended and real file size not match error + load_with_injection("FileWriter.close_writer.zero_bytes_appended", "") // LoadStreamWriter add_segment meet not inited error load_with_injection("LoadStreamWriter.add_segment.uninited_writer", "") // LoadStreamWriter add_segment meet not bad segid error load_with_injection("LoadStreamWriter.add_segment.bad_segid", "") // LoadStreamWriter add_segment meet null file writer error - load_with_injection("LoadStreamWriter.add_segment.null_file_writer", "") + load_with_injection("LoadStreamWriter.calc_file_size.null_file_writer", "") // LoadStreamWriter add_segment meet bytes_appended and real file size not match error load_with_injection("FileWriter.add_segment.zero_bytes_appended", "") // LoadStream init failed coz LoadStreamWriter init failed diff --git a/regression-test/suites/inverted_index_p0/load/test_insert.groovy b/regression-test/suites/inverted_index_p0/load/test_insert.groovy new file mode 100644 index 00000000000..97b3ca07937 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/load/test_insert.groovy @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_insert_with_index", "p0") { + + def set_be_config = { key, value -> + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + def test = { format -> + def srcName = "src_table" + def dstName = "dst_table" + sql """ DROP TABLE IF EXISTS ${srcName}; """ + sql """ + CREATE TABLE ${srcName} ( + k bigint, + v variant, + INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`k`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k`) BUCKETS 2 + PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" = ${format}); + """ + + sql """insert into ${srcName} values(1, '{"a" : 123, "b" : "xxxyyy", "c" : 111999111}')""" + sql """insert into ${srcName} values(2, '{"a" : 18811, "b" : "hello world", "c" : 1181111}')""" + sql """insert into ${srcName} values(3, '{"a" : 18811, "b" : "hello wworld", "c" : 11111}')""" + sql """insert into ${srcName} values(4, '{"a" : 1234, "b" : "hello xxx world", "c" : 8181111}')""" + qt_sql_2 """select * from ${srcName} where cast(v["a"] as smallint) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 1024 order by k""" + sql """insert into ${srcName} values(5, '{"a" : 123456789, "b" : 123456, "c" : 8181111}')""" + qt_sql_3 """select * from ${srcName} where cast(v["a"] as int) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 order by k""" + + sql """ DROP TABLE IF EXISTS ${dstName}; """ + sql """ + CREATE TABLE ${dstName} ( + k bigint, + v variant, + INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`k`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k`) BUCKETS 2 + PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" = ${format}); + """ + sql """ insert into ${dstName} select * from ${srcName}""" + qt_sql_2 """select * from ${srcName} where cast(v["a"] as smallint) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 1024 order by k""" + qt_sql_3 """select * from ${srcName} where cast(v["a"] as int) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 order by k""" + sql """ DROP TABLE IF EXISTS ${dstName}; """ + sql """ DROP TABLE IF EXISTS ${srcName}; """ + } + + set_be_config("inverted_index_ram_dir_enable", "true") + test.call("V1") + test.call("V2") + set_be_config("inverted_index_ram_dir_enable", "false") + test.call("V1") + test.call("V2") + set_be_config("inverted_index_ram_dir_enable", "true") +} \ No newline at end of file diff --git a/regression-test/suites/inverted_index_p0/load/test_spark_load.groovy b/regression-test/suites/inverted_index_p0/load/test_spark_load.groovy new file mode 100644 index 00000000000..0fd0ca35627 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/load/test_spark_load.groovy @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_spark_load_with_index_p0", "p0") { + + def set_be_config = { key, value -> + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def test = { format -> + // Need spark cluster, upload data file to hdfs + def testTable = "tbl_test_spark_load" + def testTable2 = "tbl_test_spark_load2" + def testResource = "spark_resource" + def yarnAddress = "master:8032" + def hdfsAddress = "hdfs://master:9000" + def hdfsWorkingDir = "hdfs://master:9000/doris" + brokerName =getBrokerName() + hdfsUser = getHdfsUser() + hdfsPasswd = getHdfsPasswd() + + def create_test_table = {testTablex -> + def result1 = sql """ + CREATE TABLE IF NOT EXISTS ${testTablex} ( + c_int int(11) NULL, + c_char char(15) NULL, + c_varchar varchar(100) NULL, + c_bool boolean NULL, + c_tinyint tinyint(4) NULL, + c_smallint smallint(6) NULL, + c_bigint bigint(20) NULL, + c_largeint largeint(40) NULL, + c_float float NULL, + c_double double NULL, + c_decimal decimal(6, 3) NULL, + c_decimalv3 decimal(6, 3) NULL, + c_date date NULL, + c_datev2 date NULL, + c_datetime datetime NULL, + c_datetimev2 datetime NULL, + INDEX idx_c_varchar(c_varchar) USING INVERTED, + INDEX idx_c_datetime(c_datetime) USING INVERTED + ) + DISTRIBUTED BY HASH(c_int) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "inverted_index_storage_format" = ${format} + ) + """ + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Create table should update 0 rows") + } + + def create_spark_resource = {sparkType, sparkMaster, sparkQueue -> + def result1 = sql """ + CREATE EXTERNAL RESOURCE "${testResource}" + PROPERTIES + ( + "type" = "spark", + "spark.master" = "yarn", + "spark.submit.deployMode" = "cluster", + "spark.executor.memory" = "1g", + "spark.yarn.queue" = "default", + "spark.hadoop.yarn.resourcemanager.address" = "${yarnAddress}", + "spark.hadoop.fs.defaultFS" = "${hdfsAddress}", + "working_dir" = "${hdfsWorkingDir}", + "broker" = "${brokerName}", + "broker.username" = "${hdfsUser}", + "broker.password" = "${hdfsPasswd}" + ); + """ + + // DDL/DML return 1 row and 3 column, the only value is update row count + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Create resource should update 0 rows") + } + + def load_from_hdfs_use_spark = {testTablex, testTablex2, label, hdfsFilePath1, hdfsFilePath2 -> + def result1= sql """ + LOAD LABEL ${label} + ( + DATA INFILE("${hdfsFilePath1}") + INTO TABLE ${testTablex} + COLUMNS TERMINATED BY ",", + DATA INFILE("${hdfsFilePath2}") + INTO TABLE ${testTablex2} + COLUMNS TERMINATED BY "|" + ) + WITH RESOURCE '${testResource}' + ( + "spark.executor.memory" = "2g", + "spark.shuffle.compress" = "true" + ) + PROPERTIES + ( + "timeout" = "3600" + ); + """ + + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected") + } + + def check_load_result = {checklabel, testTablex, testTablex2 -> + max_try_milli_secs = 10000 + while(max_try_milli_secs) { + result = sql "show load where label = '${checklabel}'" + if(result[0][2] == "FINISHED") { + sql "sync" + qt_select "select * from ${testTablex} order by c_int" + qt_select "select * from ${testTablex2} order by c_int" + break + } else { + sleep(1000) // wait 1 second every time + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertEquals(1, 2) + } + } + } + } + + // if 'enableHdfs' in regression-conf.groovy has been set to true, + if (enableHdfs()) { + def hdfs_txt_file_path1 = uploadToHdfs "load_p0/spark_load/all_types1.txt" + def hdfs_txt_file_path2 = uploadToHdfs "load_p0/spark_load/all_types2.txt" + try { + sql "DROP TABLE IF EXISTS ${testTable}" + sql "DROP TABLE IF EXISTS ${testTable2}" + create_test_table.call(testTable) + create_test_table.call(testTable2) + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs.call(testTable, testTable2, test_load_label, hdfs_txt_file_path1, hdfs_txt_file_path2) + check_load_result.call(test_load_label, testTable, testTable2) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + try_sql("DROP TABLE IF EXISTS ${testTable2}") + } + } + } + + set_be_config("inverted_index_ram_dir_enable", "true") + test.call("V1") + test.call("V2") + set_be_config("inverted_index_ram_dir_enable", "false") + test.call("V1") + test.call("V2") + set_be_config("inverted_index_ram_dir_enable", "true") +} diff --git a/regression-test/suites/inverted_index_p0/load/test_stream_load.groovy b/regression-test/suites/inverted_index_p0/load/test_stream_load.groovy new file mode 100644 index 00000000000..f29ff3b3512 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/load/test_stream_load.groovy @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_stream_load_with_inverted_index_p0", "nonCurrent") { + + def set_be_config = { key, value -> + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def tableName = "test_stream_load_with_inverted_index" + def calc_file_crc_on_tablet = { ip, port, tablet -> + return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet)) + } + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + set 'memtable_on_sink_node', 'true' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + boolean disableAutoCompaction = true + boolean has_update_be_config = false + try { + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + } + } + set_be_config.call("disable_auto_compaction", "true") + has_update_be_config = true + + def test = { format -> + sql """ DROP TABLE IF EXISTS ${tableName}; """ + sql """ + CREATE TABLE ${tableName} ( + k bigint, + v variant, + INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`k`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k`) BUCKETS 10 + PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" = ${format}); + """ + + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = calc_file_crc_on_tablet(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run calc file: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def resultJson = parseJson(out.trim()) + assertEquals(resultJson.start_version, "0") + assertEquals(resultJson.end_version, "11") + assertEquals(resultJson.rowset_count, "11") + } + qt_sql_1 """ + select cast(v["repo"]["name"] as string) from ${tableName} where cast(v["repo"]["name"] as string) match_phrase_prefix "davesbingrewardsbot"; + """ + + sql """ DROP TABLE IF EXISTS ${tableName}; """ + } + + set_be_config("inverted_index_ram_dir_enable", "true") + // test.call("V1") + test.call("V2") + set_be_config("inverted_index_ram_dir_enable", "false") + // test.call("V1") + test.call("V2") + set_be_config("inverted_index_ram_dir_enable", "true") + + } finally { + if (has_update_be_config) { + set_be_config.call("disable_auto_compaction", disableAutoCompaction.toString()) + } + } +} \ No newline at end of file diff --git a/regression-test/suites/fault_injection_p0/test_index_lowercase_fault_injection.groovy b/regression-test/suites/inverted_index_p0/test_index_lowercase_fault_injection.groovy similarity index 99% rename from regression-test/suites/fault_injection_p0/test_index_lowercase_fault_injection.groovy rename to regression-test/suites/inverted_index_p0/test_index_lowercase_fault_injection.groovy index e18060a7d68..2ed2a04a93b 100644 --- a/regression-test/suites/fault_injection_p0/test_index_lowercase_fault_injection.groovy +++ b/regression-test/suites/inverted_index_p0/test_index_lowercase_fault_injection.groovy @@ -77,4 +77,4 @@ suite("test_index_lowercase_fault_injection", "nonConcurrent") { qt_sql """ select count() from ${testTable} where (request match 'http'); """ } finally { } -} \ No newline at end of file + } diff --git a/regression-test/suites/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.groovy b/regression-test/suites/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.groovy new file mode 100644 index 00000000000..a68870e80d6 --- /dev/null +++ b/regression-test/suites/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.groovy @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_stream_load_with_inverted_index", "p2") { + def tableName = "test_stream_load_with_inverted_index" + + def set_be_config = { key, value -> + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def calc_file_crc_on_tablet = { ip, port, tablet -> + return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet)) + } + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + set 'memtable_on_sink_node', 'true' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + boolean disableAutoCompaction = true + boolean has_update_be_config = false + try { + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + } + } + set_be_config.call("disable_auto_compaction", "true") + has_update_be_config = true + + def test = { format -> + sql """ DROP TABLE IF EXISTS ${tableName}; """ + sql """ + CREATE TABLE ${tableName} ( + k bigint, + v variant, + INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`k`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( "replication_num" = "2", "inverted_index_storage_format" = ${format}); + """ + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + String first_backend_id; + List<String> other_backend_id = new ArrayList<>() + + String tablet_id = tablets[0].TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + logger.info("tablet: " + tablet_info) + for (def tablet in tablets) { + first_backend_id = tablet.BackendId + other_backend_id.add(tablet.BackendId) + } + other_backend_id.remove(first_backend_id) + + def checkTabletFileCrc = { + def (first_code, first_out, first_err) = calc_file_crc_on_tablet(backendId_to_backendIP[first_backend_id], backendId_to_backendHttpPort[first_backend_id], tablet_id) + logger.info("Run calc_file_crc_on_tablet: ip=" + backendId_to_backendIP[first_backend_id] + " code=" + first_code + ", out=" + first_out + ", err=" + first_err) + + for (String backend: other_backend_id) { + def (other_code, other_out, other_err) = calc_file_crc_on_tablet(backendId_to_backendIP[backend], backendId_to_backendHttpPort[backend], tablet_id) + logger.info("Run calc_file_crc_on_tablet: ip=" + backendId_to_backendIP[backend] + " code=" + other_code + ", out=" + other_out + ", err=" + other_err) + assertTrue(parseJson(first_out.trim()).crc_value == parseJson(other_out.trim()).crc_value) + assertTrue(parseJson(first_out.trim()).start_version == parseJson(other_out.trim()).start_version) + assertTrue(parseJson(first_out.trim()).end_version == parseJson(other_out.trim()).end_version) + assertTrue(parseJson(first_out.trim()).file_count == parseJson(other_out.trim()).file_count) + assertTrue(parseJson(first_out.trim()).rowset_count == parseJson(other_out.trim()).rowset_count) + } + } + + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + + + // check + checkTabletFileCrc.call() + + qt_sql_1 """ + select cast(v["repo"]["name"] as string) from ${tableName} where cast(v["repo"]["name"] as string) match "davesbingrewardsbot"; + """ + + sql """ DROP TABLE IF EXISTS ${tableName}; """ + sql """ + CREATE TABLE ${tableName} ( + k bigint, + v variant, + INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`k`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( "replication_num" = "2", "inverted_index_storage_format" = ${format}); + """ + + sql """insert into ${tableName} values(1, '{"a" : 123, "b" : "xxxyyy", "c" : 111999111}')""" + sql """insert into ${tableName} values(2, '{"a" : 18811, "b" : "hello world", "c" : 1181111}')""" + sql """insert into ${tableName} values(3, '{"a" : 18811, "b" : "hello wworld", "c" : 11111}')""" + sql """insert into ${tableName} values(4, '{"a" : 1234, "b" : "hello xxx world", "c" : 8181111}')""" + qt_sql_2 """select * from ${tableName} where cast(v["a"] as smallint) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 1024 order by k""" + sql """insert into ${tableName} values(5, '{"a" : 123456789, "b" : 123456, "c" : 8181111}')""" + qt_sql_3 """select * from ${tableName} where cast(v["a"] as int) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 order by k""" + + // check + checkTabletFileCrc.call() + sql """ DROP TABLE IF EXISTS ${tableName}; """ + } + set_be_config("inverted_index_ram_dir_enable", "true") + // test.call("V1") + test.call("V2") + set_be_config("inverted_index_ram_dir_enable", "false") + // test.call("V1") + test.call("V2") + set_be_config("inverted_index_ram_dir_enable", "true") + } finally { + if (has_update_be_config) { + set_be_config.call("disable_auto_compaction", disableAutoCompaction.toString()) + } + } +} \ No newline at end of file diff --git a/regression-test/suites/inverted_index_p2/test_insert_into_index.groovy b/regression-test/suites/inverted_index_p2/test_insert_into_index.groovy new file mode 100644 index 00000000000..055cff66c09 --- /dev/null +++ b/regression-test/suites/inverted_index_p2/test_insert_into_index.groovy @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_insert_into_with_inverted_index", "p2"){ + def src_table = "srcTable" + def dst_table = "dstTable" + + sql """ + CREATE TABLE IF NOT EXISTS ${src_table} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 2 + properties("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + sql """ + CREATE TABLE IF NOT EXISTS ${dst_table} ( + k bigint, + v variant, + INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 2 + properties("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + for (int i = 1; i <= 100; i++) { + load_json_data.call(src_table, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + } + + sql """ insert into ${dst_table} select * from ${src_table}""" +} diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy index 5411224c200..77114904f18 100644 --- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy @@ -34,7 +34,9 @@ suite("test_http_stream", "p0") { dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP, dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP, dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP, - dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP + dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP, + INDEX idx_dt_2 (`dt_2`) USING INVERTED, + INDEX idx_dt_3 (`dt_3`) USING INVERTED ) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ( @@ -298,7 +300,9 @@ suite("test_http_stream", "p0") { sex TINYINT, phone LARGEINT, address VARCHAR(500), - register_time DATETIME + register_time DATETIME, + INDEX idx_username (`username`) USING INVERTED, + INDEX idx_address (`address`) USING INVERTED ) DUPLICATE KEY(`user_id`, `username`) DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 diff --git a/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy b/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy index ff239e5fef1..9eb948bf55a 100644 --- a/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy +++ b/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy @@ -36,7 +36,9 @@ suite("test_mysql_load", "p0") { `v9` date REPLACE_IF_NOT_NULL NULL, `v10` char(10) REPLACE_IF_NOT_NULL NULL, `v11` varchar(6) REPLACE_IF_NOT_NULL NULL, - `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL + `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL, + INDEX idx_k1 (`k1`) USING INVERTED, + INDEX idx_k2 (`k2`) USING INVERTED ) ENGINE=OLAP AGGREGATE KEY(`k1`, `k2`) COMMENT 'OLAP' diff --git a/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy b/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy index b63c8711301..79c3fecd69e 100644 --- a/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy +++ b/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy @@ -28,7 +28,9 @@ suite("test_mysql_load_big_file", "p0") { `v1` tinyint(4) NULL, `v2` string NULL, `v3` date NULL, - `v4` datetime NULL + `v4` datetime NULL, + INDEX idx_v2 (`v2`) USING INVERTED, + INDEX idx_v3 (`v3`) USING INVERTED ) ENGINE=OLAP DUPLICATE KEY(`k1`, `k2`) COMMENT 'OLAP' --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org