This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bf51d5b82a1 [feature](move-memtable) support for inverted index file 
(#35891)
bf51d5b82a1 is described below

commit bf51d5b82a193d3515d730d673e99585e4867b24
Author: Sun Chenyang <csun5...@gmail.com>
AuthorDate: Fri Jul 5 10:03:11 2024 +0800

    [feature](move-memtable) support for inverted index file (#35891)
    
    support for inverted index file in move-memtable
---
 be/src/io/fs/stream_sink_file_writer.cpp           |  12 +-
 be/src/io/fs/stream_sink_file_writer.h             |   5 +-
 be/src/olap/delta_writer_v2.cpp                    |   1 +
 be/src/olap/rowset/beta_rowset_writer.cpp          |  17 +-
 be/src/olap/rowset/beta_rowset_writer.h            |   3 +-
 be/src/olap/rowset/beta_rowset_writer_v2.cpp       |   6 +-
 be/src/olap/rowset/beta_rowset_writer_v2.h         |   3 +-
 be/src/olap/rowset/rowset_writer.h                 |   4 +-
 be/src/olap/rowset/rowset_writer_context.h         |   3 +
 be/src/olap/rowset/segment_creator.cpp             |  40 +++--
 be/src/olap/rowset/segment_creator.h               |  10 +-
 .../segment_v2/inverted_index_file_writer.cpp      |  12 +-
 .../rowset/segment_v2/inverted_index_file_writer.h |   9 +-
 .../segment_v2/inverted_index_fs_directory.cpp     | 101 +++++++++++
 .../segment_v2/inverted_index_fs_directory.h       |   3 +
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  11 +-
 be/src/olap/rowset/segment_v2/segment_writer.h     |   4 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  11 +-
 .../rowset/segment_v2/vertical_segment_writer.h    |   3 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |   2 +-
 be/src/runtime/load_stream.cpp                     |  14 +-
 be/src/runtime/load_stream.h                       |   2 +-
 be/src/runtime/load_stream_writer.cpp              | 113 +++++++++----
 be/src/runtime/load_stream_writer.h                |  10 +-
 be/src/util/thrift_util.cpp                        |   9 +-
 be/src/util/thrift_util.h                          |   2 +-
 be/src/vec/sink/load_stream_stub.cpp               |   3 +-
 be/src/vec/sink/load_stream_stub.h                 |   2 +-
 be/test/io/fs/stream_sink_file_writer_test.cpp     |   3 +-
 .../org/apache/doris/planner/OlapTableSink.java    |   1 +
 gensrc/proto/internal_service.proto                |   2 +
 gensrc/proto/olap_common.proto                     |   5 +
 gensrc/thrift/Descriptors.thrift                   |   1 +
 .../data/inverted_index_p0/load/test_insert.out    |  73 ++++++++
 .../inverted_index_p0/load/test_stream_load.out    |  45 +++++
 .../test_index_lowercase_fault_injection.out       |   0
 .../test_stream_load_with_inverted_index.out       |  43 +++++
 ..._writer_v2_back_pressure_fault_injection.groovy |   3 +
 .../test_load_stream_fault_injection.groovy        |  20 +--
 .../inverted_index_p0/load/test_insert.groovy      |  81 +++++++++
 .../inverted_index_p0/load/test_spark_load.groovy  | 174 +++++++++++++++++++
 .../inverted_index_p0/load/test_stream_load.groovy | 150 +++++++++++++++++
 .../test_index_lowercase_fault_injection.groovy    |   2 +-
 .../test_stream_load_with_inverted_index.groovy    | 185 +++++++++++++++++++++
 .../test_insert_into_index.groovy                  |  75 +++++++++
 .../load_p0/http_stream/test_http_stream.groovy    |   8 +-
 .../load_p0/mysql_load/test_mysql_load.groovy      |   4 +-
 .../mysql_load/test_mysql_load_big_file.groovy     |   4 +-
 48 files changed, 1187 insertions(+), 107 deletions(-)

diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index bca548be9a2..1d7f823af10 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -28,15 +28,17 @@
 namespace doris::io {
 
 void StreamSinkFileWriter::init(PUniqueId load_id, int64_t partition_id, 
int64_t index_id,
-                                int64_t tablet_id, int32_t segment_id) {
+                                int64_t tablet_id, int32_t segment_id, 
FileType file_type) {
     VLOG_DEBUG << "init stream writer, load id(" << 
UniqueId(load_id).to_string()
                << "), partition id(" << partition_id << "), index id(" << 
index_id
-               << "), tablet_id(" << tablet_id << "), segment_id(" << 
segment_id << ")";
+               << "), tablet_id(" << tablet_id << "), segment_id(" << 
segment_id << ")"
+               << ", file_type(" << file_type << ")";
     _load_id = load_id;
     _partition_id = partition_id;
     _index_id = index_id;
     _tablet_id = tablet_id;
     _segment_id = segment_id;
+    _file_type = file_type;
 }
 
 Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
@@ -47,7 +49,7 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
 
     VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ", 
index_id: " << _index_id
                << ", tablet_id: " << _tablet_id << ", segment_id: " << 
_segment_id
-               << ", data_length: " << bytes_req;
+               << ", data_length: " << bytes_req << "file_type" << _file_type;
 
     std::span<const Slice> slices {data, data_cnt};
     size_t stream_index = 0;
@@ -67,7 +69,7 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
         });
         if (!skip_stream) {
             st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
-                                     _bytes_appended, slices);
+                                     _bytes_appended, slices, false, 
_file_type);
         }
         
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 {
             if (stream_index >= 2) {
@@ -140,7 +142,7 @@ Status StreamSinkFileWriter::_finalize() {
     bool ok = false;
     for (auto& stream : _streams) {
         auto st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
-                                      _bytes_appended, {}, true);
+                                      _bytes_appended, {}, true, _file_type);
         ok = ok || st.ok();
         if (!st.ok()) {
             LOG(WARNING) << "failed to send segment eos to backend " << 
stream->dst_id()
diff --git a/be/src/io/fs/stream_sink_file_writer.h 
b/be/src/io/fs/stream_sink_file_writer.h
index 0d621e8b4c1..0950039077b 100644
--- a/be/src/io/fs/stream_sink_file_writer.h
+++ b/be/src/io/fs/stream_sink_file_writer.h
@@ -18,7 +18,7 @@
 #pragma once
 
 #include <brpc/stream.h>
-#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/olap_common.pb.h>
 
 #include <queue>
 
@@ -40,7 +40,7 @@ public:
             : _streams(std::move(streams)) {}
 
     void init(PUniqueId load_id, int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
-              int32_t segment_id);
+              int32_t segment_id, FileType file_type = FileType::SEGMENT_FILE);
 
     Status appendv(const Slice* data, size_t data_cnt) override;
 
@@ -69,6 +69,7 @@ private:
     int32_t _segment_id;
     size_t _bytes_appended = 0;
     State _state {State::OPENED};
+    FileType _file_type {FileType::SEGMENT_FILE};
 };
 
 } // namespace io
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 3f2f7bf99fa..805f072f6e6 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -123,6 +123,7 @@ Status DeltaWriterV2::init() {
     context.rowset_id = 
ExecEnv::GetInstance()->storage_engine().next_rowset_id();
     context.data_dir = nullptr;
     context.partial_update_info = _partial_update_info;
+    context.memtable_on_sink_support_index_v2 = true;
 
     _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
     RETURN_IF_ERROR(_rowset_writer->init(context));
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index d418a89a361..17801ec16fd 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -831,10 +831,19 @@ Status BaseBetaRowsetWriter::_create_file_writer(const 
std::string& path,
     return Status::OK();
 }
 
-Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id,
-                                                io::FileWriterPtr& 
file_writer) {
-    auto path = _context.segment_path(segment_id);
-    return _create_file_writer(path, file_writer);
+Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, 
io::FileWriterPtr& file_writer,
+                                                FileType file_type) {
+    auto segment_path = _context.segment_path(segment_id);
+    if (file_type == FileType::INVERTED_INDEX_FILE) {
+        std::string prefix =
+                std::string 
{InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)};
+        std::string index_path = 
InvertedIndexDescriptor::get_index_file_path_v2(prefix);
+        return _create_file_writer(index_path, file_writer);
+    } else if (file_type == FileType::SEGMENT_FILE) {
+        return _create_file_writer(segment_path, file_writer);
+    }
+    return Status::Error<ErrorCode::INTERNAL_ERROR>(
+            fmt::format("failed to create file = {}, file type = {}", 
segment_path, file_type));
 }
 
 Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index d729a15df32..98bb43c6092 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -98,7 +98,8 @@ public:
     Status add_rowset(RowsetSharedPtr rowset) override;
     Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) 
override;
 
-    Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) 
override;
+    Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer,
+                              FileType file_type = FileType::SEGMENT_FILE) 
override;
 
     Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
                        TabletSchemaSPtr flush_schema) override;
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp 
b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
index 1fee4e04034..3ebe331cfc1 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
@@ -69,14 +69,14 @@ Status BetaRowsetWriterV2::init(const RowsetWriterContext& 
rowset_writer_context
     return Status::OK();
 }
 
-Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, 
io::FileWriterPtr& file_writer) {
+Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, 
io::FileWriterPtr& file_writer,
+                                              FileType file_type) {
     auto partition_id = _context.partition_id;
     auto index_id = _context.index_id;
     auto tablet_id = _context.tablet_id;
     auto load_id = _context.load_id;
-
     auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(_streams);
-    stream_writer->init(load_id, partition_id, index_id, tablet_id, 
segment_id);
+    stream_writer->init(load_id, partition_id, index_id, tablet_id, 
segment_id, file_type);
     file_writer = std::move(stream_writer);
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h 
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index e406a2037a7..89bd3045089 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -80,7 +80,8 @@ public:
                 "add_rowset_for_linked_schema_change is not implemented");
     }
 
-    Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) 
override;
+    Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer,
+                              FileType file_type = FileType::SEGMENT_FILE) 
override;
 
     Status flush() override {
         return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>("flush is not 
implemented");
diff --git a/be/src/olap/rowset/rowset_writer.h 
b/be/src/olap/rowset/rowset_writer.h
index 75a592cf98d..6861b8ab7e2 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/olap_file.pb.h>
 #include <gen_cpp/types.pb.h>
 
@@ -89,7 +90,8 @@ public:
     // Precondition: the input `rowset` should have the same type of the 
rowset we're building
     virtual Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) 
= 0;
 
-    virtual Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& 
writer) {
+    virtual Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& 
writer,
+                                      FileType file_type = 
FileType::SEGMENT_FILE) {
         return Status::NotSupported("RowsetWriter does not support 
create_file_writer");
     }
 
diff --git a/be/src/olap/rowset/rowset_writer_context.h 
b/be/src/olap/rowset/rowset_writer_context.h
index 488030993e1..0130916bfb4 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -88,6 +88,9 @@ struct RowsetWriterContext {
     std::shared_ptr<FileWriterCreator> file_writer_creator;
     std::shared_ptr<SegmentCollector> segment_collector;
 
+    // memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter 
to send inverted index file
+    bool memtable_on_sink_support_index_v2 = false;
+
     /// begin file cache opts
     bool write_file_cache = false;
     bool is_hot_data = false;
diff --git a/be/src/olap/rowset/segment_creator.cpp 
b/be/src/olap/rowset/segment_creator.cpp
index 738c6e2f9f9..82313f988cb 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -134,8 +134,17 @@ Status 
SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWrit
 
 Status 
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
 writer,
                                               int32_t segment_id, bool 
no_compression) {
-    io::FileWriterPtr file_writer;
-    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, 
file_writer));
+    io::FileWriterPtr segment_file_writer;
+    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, 
segment_file_writer));
+
+    io::FileWriterPtr inverted_file_writer;
+    if (_context.tablet_schema->has_inverted_index() &&
+        _context.tablet_schema->get_inverted_index_storage_format() >=
+                InvertedIndexStorageFormatPB::V2 &&
+        _context.memtable_on_sink_support_index_v2) {
+        RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, 
inverted_file_writer,
+                                                             
FileType::INVERTED_INDEX_FILE));
+    }
 
     segment_v2::SegmentWriterOptions writer_options;
     writer_options.enable_unique_key_merge_on_write = 
_context.enable_unique_key_merge_on_write;
@@ -146,9 +155,10 @@ Status 
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
     }
 
     writer = std::make_unique<segment_v2::SegmentWriter>(
-            file_writer.get(), segment_id, _context.tablet_schema, 
_context.tablet,
-            _context.data_dir, _context.max_rows_per_segment, writer_options, 
_context.mow_context);
-    RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer)));
+            segment_file_writer.get(), segment_id, _context.tablet_schema, 
_context.tablet,
+            _context.data_dir, _context.max_rows_per_segment, writer_options, 
_context.mow_context,
+            std::move(inverted_file_writer));
+    RETURN_IF_ERROR(_seg_files.add(segment_id, 
std::move(segment_file_writer)));
     auto s = writer->init();
     if (!s.ok()) {
         LOG(WARNING) << "failed to init segment writer: " << s.to_string();
@@ -161,8 +171,17 @@ Status 
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
 Status SegmentFlusher::_create_segment_writer(
         std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t 
segment_id,
         bool no_compression) {
-    io::FileWriterPtr file_writer;
-    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, 
file_writer));
+    io::FileWriterPtr segment_file_writer;
+    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, 
segment_file_writer));
+
+    io::FileWriterPtr inverted_file_writer;
+    if (_context.tablet_schema->has_inverted_index() &&
+        _context.tablet_schema->get_inverted_index_storage_format() >=
+                InvertedIndexStorageFormatPB::V2 &&
+        _context.memtable_on_sink_support_index_v2) {
+        RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, 
inverted_file_writer,
+                                                             
FileType::INVERTED_INDEX_FILE));
+    }
 
     segment_v2::VerticalSegmentWriterOptions writer_options;
     writer_options.enable_unique_key_merge_on_write = 
_context.enable_unique_key_merge_on_write;
@@ -173,9 +192,10 @@ Status SegmentFlusher::_create_segment_writer(
     }
 
     writer = std::make_unique<segment_v2::VerticalSegmentWriter>(
-            file_writer.get(), segment_id, _context.tablet_schema, 
_context.tablet,
-            _context.data_dir, _context.max_rows_per_segment, writer_options, 
_context.mow_context);
-    RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer)));
+            segment_file_writer.get(), segment_id, _context.tablet_schema, 
_context.tablet,
+            _context.data_dir, _context.max_rows_per_segment, writer_options, 
_context.mow_context,
+            std::move(inverted_file_writer));
+    RETURN_IF_ERROR(_seg_files.add(segment_id, 
std::move(segment_file_writer)));
     auto s = writer->init();
     if (!s.ok()) {
         LOG(WARNING) << "failed to init segment writer: " << s.to_string();
diff --git a/be/src/olap/rowset/segment_creator.h 
b/be/src/olap/rowset/segment_creator.h
index 3226ab0adf8..97a8f177ad9 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -17,9 +17,11 @@
 
 #pragma once
 
+#include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/olap_file.pb.h>
 
 #include <string>
+#include <typeinfo>
 #include <unordered_map>
 #include <vector>
 
@@ -49,7 +51,8 @@ class FileWriterCreator {
 public:
     virtual ~FileWriterCreator() = default;
 
-    virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer) 
= 0;
+    virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer,
+                          FileType file_type = FileType::SEGMENT_FILE) = 0;
 };
 
 template <class T>
@@ -57,8 +60,9 @@ class FileWriterCreatorT : public FileWriterCreator {
 public:
     explicit FileWriterCreatorT(T* t) : _t(t) {}
 
-    Status create(uint32_t segment_id, io::FileWriterPtr& file_writer) 
override {
-        return _t->create_file_writer(segment_id, file_writer);
+    Status create(uint32_t segment_id, io::FileWriterPtr& file_writer,
+                  FileType file_type = FileType::SEGMENT_FILE) override {
+        return _t->create_file_writer(segment_id, file_writer, file_type);
     }
 
 private:
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index cdd26fecf87..22d494e5132 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -341,9 +341,15 @@ size_t InvertedIndexFileWriter::write_v2() {
     io::Path index_path 
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
 
     auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, 
index_path.parent_path().c_str());
-
-    auto compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
-            out_dir->createOutput(index_path.filename().c_str()));
+    std::unique_ptr<lucene::store::IndexOutput> compound_file_output;
+    // idx v2 writer != nullptr means memtable on sink node now
+    if (_idx_v2_writer != nullptr) {
+        compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
+                out_dir->createOutputV2(_idx_v2_writer.get()));
+    } else {
+        compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
+                out_dir->createOutput(index_path.filename().c_str()));
+    }
 
     // Write the version number
     compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h 
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
index 7ec71c0b38a..0d82504c07f 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 
 namespace doris {
@@ -46,12 +47,14 @@ class InvertedIndexFileWriter {
 public:
     InvertedIndexFileWriter(io::FileSystemSPtr fs, std::string 
index_path_prefix,
                             std::string rowset_id, int64_t seg_id,
-                            InvertedIndexStorageFormatPB storage_format)
+                            InvertedIndexStorageFormatPB storage_format,
+                            io::FileWriterPtr file_writer = nullptr)
             : _fs(std::move(fs)),
               _index_path_prefix(std::move(index_path_prefix)),
               _rowset_id(std::move(rowset_id)),
               _seg_id(seg_id),
-              _storage_format(storage_format) {}
+              _storage_format(storage_format),
+              _idx_v2_writer(std::move(file_writer)) {}
 
     Result<DorisFSDirectory*> open(const TabletIndex* index_meta);
     Status delete_index(const TabletIndex* index_meta);
@@ -76,6 +79,8 @@ private:
     int64_t _seg_id;
     InvertedIndexStorageFormatPB _storage_format;
     size_t _file_size = 0;
+    // write to disk or stream
+    io::FileWriterPtr _idx_v2_writer;
 };
 } // namespace segment_v2
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 54d484d1199..0443bf345ba 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -98,6 +98,21 @@ public:
     int64_t length() const override;
 };
 
+class DorisFSDirectory::FSIndexOutputV2 : public 
lucene::store::BufferedIndexOutput {
+private:
+    io::FileWriter* _index_v2_file_writer = nullptr;
+
+protected:
+    void flushBuffer(const uint8_t* b, const int32_t size) override;
+
+public:
+    FSIndexOutputV2() = default;
+    void init(io::FileWriter* file_writer);
+    ~FSIndexOutputV2() override;
+    void close() override;
+    int64_t length() const override;
+};
+
 bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const 
char* path,
                                           IndexInput*& ret, CLuceneError& 
error,
                                           int32_t buffer_size) {
@@ -333,6 +348,86 @@ int64_t DorisFSDirectory::FSIndexOutput::length() const {
     return _writer->bytes_appended();
 }
 
+void DorisFSDirectory::FSIndexOutputV2::init(io::FileWriter* file_writer) {
+    _index_v2_file_writer = file_writer;
+    DBUG_EXECUTE_IF(
+            
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_"
+            "init",
+            {
+                _CLTHROWA(CL_ERR_IO,
+                          "debug point: test throw error in fsindexoutput init 
mock error");
+            })
+}
+
+DorisFSDirectory::FSIndexOutputV2::~FSIndexOutputV2() {}
+
+void DorisFSDirectory::FSIndexOutputV2::flushBuffer(const uint8_t* b, const 
int32_t size) {
+    if (_index_v2_file_writer != nullptr && b != nullptr && size > 0) {
+        Slice data {b, (size_t)size};
+        DBUG_EXECUTE_IF(
+                
"DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_"
+                "flushBuffer",
+                { return; })
+        Status st = _index_v2_file_writer->append(data);
+        DBUG_EXECUTE_IF(
+                
"DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer", {
+                    st = 
Status::Error<doris::ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+                            "flush buffer mock error");
+                })
+        if (!st.ok()) {
+            LOG(WARNING) << "File IO Write error: " << st.to_string();
+            _CLTHROWA(CL_ERR_IO, "writer append data when flushBuffer error");
+        }
+    } else {
+        if (_index_v2_file_writer == nullptr) {
+            LOG(WARNING) << "File writer is nullptr in 
DorisFSDirectory::FSIndexOutputV2, "
+                            "ignore flush.";
+            _CLTHROWA(CL_ERR_IO, "flushBuffer error, _index_v2_file_writer = 
nullptr");
+        } else if (b == nullptr) {
+            LOG(WARNING) << "buffer is nullptr when flushBuffer in "
+                            "DorisFSDirectory::FSIndexOutput";
+        }
+    }
+}
+
+void DorisFSDirectory::FSIndexOutputV2::close() {
+    try {
+        BufferedIndexOutput::close();
+        DBUG_EXECUTE_IF(
+                
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_"
+                "close",
+                {
+                    _CLTHROWA(CL_ERR_IO,
+                              "debug point: test throw error in 
bufferedindexoutput close");
+                })
+    } catch (CLuceneError& err) {
+        LOG(WARNING) << "FSIndexOutputV2 close, BufferedIndexOutput close 
error: " << err.what();
+        if (err.number() == CL_ERR_IO) {
+            LOG(WARNING) << "FSIndexOutputV2 close, BufferedIndexOutput close 
IO error: "
+                         << err.what();
+        }
+        _CLTHROWA(err.number(), err.what());
+    }
+    if (_index_v2_file_writer) {
+        auto ret = _index_v2_file_writer->close();
+        
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error",
+                        { ret = Status::Error<INTERNAL_ERROR>("writer close 
status error"); })
+        if (!ret.ok()) {
+            LOG(WARNING) << "FSIndexOutputV2 close, stream sink file writer 
close error: "
+                         << ret.to_string();
+            _CLTHROWA(CL_ERR_IO, ret.to_string().c_str());
+        }
+    } else {
+        LOG(WARNING) << "File writer is nullptr, ignore finalize and close.";
+        _CLTHROWA(CL_ERR_IO, "close file writer error, _index_v2_file_writer = 
nullptr");
+    }
+}
+
+int64_t DorisFSDirectory::FSIndexOutputV2::length() const {
+    CND_PRECONDITION(_index_v2_file_writer != nullptr, "file is not open");
+    return _index_v2_file_writer->bytes_appended();
+}
+
 DorisFSDirectory::DorisFSDirectory() {
     filemode = 0644;
     this->lockFactory = nullptr;
@@ -495,6 +590,12 @@ lucene::store::IndexOutput* 
DorisFSDirectory::createOutput(const char* name) {
     return ret;
 }
 
+lucene::store::IndexOutput* DorisFSDirectory::createOutputV2(io::FileWriter* 
file_writer) {
+    auto* ret = _CLNEW FSIndexOutputV2();
+    ret->init(file_writer);
+    return ret;
+}
+
 std::string DorisFSDirectory::toString() const {
     return std::string("DorisFSDirectory@") + this->directory;
 }
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h 
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
index d9069d66ef2..b3e0352d7ad 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
@@ -60,9 +60,11 @@ protected:
 
 public:
     class FSIndexOutput;
+    class FSIndexOutputV2;
     class FSIndexInput;
 
     friend class DorisFSDirectory::FSIndexOutput;
+    friend class DorisFSDirectory::FSIndexOutputV2;
     friend class DorisFSDirectory::FSIndexInput;
 
     const io::FileSystemSPtr& getFileSystem() { return _fs; }
@@ -78,6 +80,7 @@ public:
     void renameFile(const char* from, const char* to) override;
     void touchFile(const char* name) override;
     lucene::store::IndexOutput* createOutput(const char* name) override;
+    lucene::store::IndexOutput* createOutputV2(io::FileWriter* file_writer);
     void close() override;
     std::string toString() const override;
     static const char* getClassName();
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 729e2500384..d22e1060dd3 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -43,8 +43,9 @@
 #include "olap/key_coder.h"
 #include "olap/olap_common.h"
 #include "olap/primary_key_index.h"
-#include "olap/row_cursor.h"                      // RowCursor // IWYU pragma: 
keep
-#include "olap/rowset/rowset_writer_context.h"    // RowsetWriterContext
+#include "olap/row_cursor.h"                   // RowCursor // IWYU pragma: 
keep
+#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
+#include "olap/rowset/segment_creator.h"
 #include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
 #include "olap/rowset/segment_v2/inverted_index_file_writer.h"
 #include "olap/rowset/segment_v2/inverted_index_writer.h"
@@ -85,7 +86,8 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, 
uint32_t segment_id,
                              TabletSchemaSPtr tablet_schema, BaseTabletSPtr 
tablet,
                              DataDir* data_dir, uint32_t max_row_per_segment,
                              const SegmentWriterOptions& opts,
-                             std::shared_ptr<MowContext> mow_context)
+                             std::shared_ptr<MowContext> mow_context,
+                             io::FileWriterPtr inverted_file_writer)
         : _segment_id(segment_id),
           _tablet_schema(std::move(tablet_schema)),
           _tablet(std::move(tablet)),
@@ -140,7 +142,8 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, 
uint32_t segment_id,
                 std::string 
{InvertedIndexDescriptor::get_index_file_path_prefix(
                         file_writer->path().c_str())},
                 _opts.rowset_ctx->rowset_id.to_string(), segment_id,
-                _tablet_schema->get_inverted_index_storage_format());
+                _tablet_schema->get_inverted_index_storage_format(),
+                std::move(inverted_file_writer));
     }
 }
 
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h 
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 92af12d4da6..9c667ee92fc 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -85,7 +85,8 @@ public:
     explicit SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id,
                            TabletSchemaSPtr tablet_schema, BaseTabletSPtr 
tablet, DataDir* data_dir,
                            uint32_t max_row_per_segment, const 
SegmentWriterOptions& opts,
-                           std::shared_ptr<MowContext> mow_context);
+                           std::shared_ptr<MowContext> mow_context,
+                           io::FileWriterPtr inverted_file_writer = nullptr);
     ~SegmentWriter();
 
     Status init();
@@ -197,7 +198,6 @@ private:
     // Not owned. owned by RowsetWriter or SegmentFlusher
     io::FileWriter* _file_writer = nullptr;
     std::unique_ptr<InvertedIndexFileWriter> _inverted_index_file_writer;
-
     SegmentFooterPB _footer;
     size_t _num_key_columns;
     size_t _num_short_key_columns;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 0930325d6d8..5d2d6ac0769 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -42,8 +42,9 @@
 #include "olap/olap_common.h"
 #include "olap/partial_update_info.h"
 #include "olap/primary_key_index.h"
-#include "olap/row_cursor.h"                      // RowCursor // IWYU pragma: 
keep
-#include "olap/rowset/rowset_writer_context.h"    // RowsetWriterContext
+#include "olap/row_cursor.h"                   // RowCursor // IWYU pragma: 
keep
+#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
+#include "olap/rowset/segment_creator.h"
 #include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/rowset/segment_v2/inverted_index_file_writer.h"
@@ -83,7 +84,8 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* 
file_writer, uint32
                                              TabletSchemaSPtr tablet_schema, 
BaseTabletSPtr tablet,
                                              DataDir* data_dir, uint32_t 
max_row_per_segment,
                                              const 
VerticalSegmentWriterOptions& opts,
-                                             std::shared_ptr<MowContext> 
mow_context)
+                                             std::shared_ptr<MowContext> 
mow_context,
+                                             io::FileWriterPtr 
inverted_file_writer)
         : _segment_id(segment_id),
           _tablet_schema(std::move(tablet_schema)),
           _tablet(std::move(tablet)),
@@ -114,7 +116,8 @@ 
VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32
                 std::string 
{InvertedIndexDescriptor::get_index_file_path_prefix(
                         _opts.rowset_ctx->segment_path(segment_id))},
                 _opts.rowset_ctx->rowset_id.to_string(), segment_id,
-                _tablet_schema->get_inverted_index_storage_format());
+                _tablet_schema->get_inverted_index_storage_format(),
+                std::move(inverted_file_writer));
     }
 }
 
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
index 3809a8301d5..8068b3e44be 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -82,7 +82,8 @@ public:
                                    TabletSchemaSPtr tablet_schema, 
BaseTabletSPtr tablet,
                                    DataDir* data_dir, uint32_t 
max_row_per_segment,
                                    const VerticalSegmentWriterOptions& opts,
-                                   std::shared_ptr<MowContext> mow_context);
+                                   std::shared_ptr<MowContext> mow_context,
+                                   io::FileWriterPtr inverted_file_writer = 
nullptr);
     ~VerticalSegmentWriter();
 
     VerticalSegmentWriter(const VerticalSegmentWriter&) = delete;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 6980f0be3f2..8138c7594b8 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -962,7 +962,7 @@ Status 
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
     case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
     case TDataSinkType::OLAP_TABLE_SINK: {
         if (state->query_options().enable_memtable_on_sink_node &&
-            
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) &&
+            
!_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
             !config::is_cloud_mode()) {
             _sink.reset(new OlapTableSinkV2OperatorX(pool, 
next_sink_operator_id(), row_desc,
                                                      output_exprs));
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index fc8a6cd1b61..0a35bf6008e 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -136,15 +136,23 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
     // Each sender sends data in one segment sequential, so we also do not
     // need a lock here.
     bool eos = header.segment_eos();
+    FileType file_type = header.file_type();
     uint32_t new_segid = mapping->at(segid);
     DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
     butil::IOBuf buf = data->movable();
-    auto flush_func = [this, new_segid, eos, buf, header]() {
+    auto flush_func = [this, new_segid, eos, buf, header, file_type]() {
         signal::set_signal_task_id(_load_id);
         g_load_stream_flush_running_threads << -1;
-        auto st = _load_stream_writer->append_data(new_segid, header.offset(), 
buf);
+        auto st = _load_stream_writer->append_data(new_segid, header.offset(), 
buf, file_type);
         if (eos && st.ok()) {
-            st = _load_stream_writer->close_segment(new_segid);
+            if (file_type == FileType::SEGMENT_FILE || file_type == 
FileType::INVERTED_INDEX_FILE) {
+                st = _load_stream_writer->close_writer(new_segid, file_type);
+            } else {
+                st = Status::InternalError(
+                        "appent data failed, file type error, file type = {}, "
+                        "segment_id={}",
+                        file_type, new_segid);
+            }
         }
         if (!st.ok() && _failed_st->ok()) {
             _failed_st = std::make_shared<Status>(st);
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index b2635698379..9e6e0e36a4b 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -18,7 +18,7 @@
 #pragma once
 
 #include <bthread/mutex.h>
-#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/olap_common.pb.h>
 
 #include <condition_variable>
 #include <memory>
diff --git a/be/src/runtime/load_stream_writer.cpp 
b/be/src/runtime/load_stream_writer.cpp
index bc02be98bc4..925229a43ce 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -94,28 +94,31 @@ Status LoadStreamWriter::init() {
     return Status::OK();
 }
 
-Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, 
butil::IOBuf buf) {
+Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, 
butil::IOBuf buf,
+                                     FileType file_type) {
     SCOPED_ATTACH_TASK(_query_thread_context);
     io::FileWriter* file_writer = nullptr;
+    auto& file_writers =
+            file_type == FileType::SEGMENT_FILE ? _segment_file_writers : 
_inverted_file_writers;
     {
         std::lock_guard lock_guard(_lock);
         DCHECK(_is_init);
-        if (segid >= _segment_file_writers.size()) {
-            for (size_t i = _segment_file_writers.size(); i <= segid; i++) {
+        if (segid >= file_writers.size()) {
+            for (size_t i = file_writers.size(); i <= segid; i++) {
                 Status st;
                 io::FileWriterPtr file_writer;
-                st = _rowset_writer->create_file_writer(i, file_writer);
+                st = _rowset_writer->create_file_writer(i, file_writer, 
file_type);
                 if (!st.ok()) {
                     _is_canceled = true;
                     return st;
                 }
-                _segment_file_writers.push_back(std::move(file_writer));
+                file_writers.push_back(std::move(file_writer));
                 g_load_stream_file_writer_cnt << 1;
             }
         }
 
         // TODO: IOBuf to Slice
-        file_writer = _segment_file_writers[segid].get();
+        file_writer = file_writers[segid].get();
     }
     DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { 
file_writer = nullptr; });
     VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
@@ -130,25 +133,32 @@ Status LoadStreamWriter::append_data(uint32_t segid, 
uint64_t offset, butil::IOB
     return file_writer->append(buf.to_string());
 }
 
-Status LoadStreamWriter::close_segment(uint32_t segid) {
+Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) {
     SCOPED_ATTACH_TASK(_query_thread_context);
     io::FileWriter* file_writer = nullptr;
+    auto& file_writers =
+            file_type == FileType::SEGMENT_FILE ? _segment_file_writers : 
_inverted_file_writers;
     {
         std::lock_guard lock_guard(_lock);
-        DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.uninited_writer", { 
_is_init = false; });
+        DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.uninited_writer", { 
_is_init = false; });
         if (!_is_init) {
-            return Status::Corruption("close_segment failed, LoadStreamWriter 
is not inited");
+            return Status::Corruption("close_writer failed, LoadStreamWriter 
is not inited");
         }
-        DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.bad_segid",
-                        { segid = _segment_file_writers.size(); });
-        if (segid >= _segment_file_writers.size()) {
-            return Status::Corruption("close_segment failed, segment {} is 
never opened", segid);
+        DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid",
+                        { segid = file_writers.size(); });
+        if (segid >= file_writers.size()) {
+            return Status::Corruption(
+                    "close_writer failed, file {} is never opened, file type 
is {}", segid,
+                    file_type);
         }
-        file_writer = _segment_file_writers[segid].get();
+        file_writer = file_writers[segid].get();
     }
-    DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.null_file_writer", { 
file_writer = nullptr; });
+
+    DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", { 
file_writer = nullptr; });
     if (file_writer == nullptr) {
-        return Status::Corruption("close_segment failed, file writer {} is 
destoryed", segid);
+        return Status::Corruption(
+                "close_writer failed, file writer {} is destoryed, fiel type 
is {}", segid,
+                file_type);
     }
     auto st = file_writer->close();
     if (!st.ok()) {
@@ -156,10 +166,12 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
         return st;
     }
     g_load_stream_file_writer_cnt << -1;
-    LOG(INFO) << "segment " << segid << " path " << 
file_writer->path().native()
-              << "closed, written " << file_writer->bytes_appended() << " 
bytes";
+    LOG(INFO) << "file " << segid << " path " << file_writer->path().native() 
<< "closed, written "
+              << file_writer->bytes_appended() << " bytes"
+              << ", file type is " << file_type;
     if (file_writer->bytes_appended() == 0) {
-        return Status::Corruption("segment {} closed with 0 bytes", 
file_writer->path().native());
+        return Status::Corruption("file {} closed with 0 bytes, file type is 
{}",
+                                  file_writer->path().native(), file_type);
     }
     return Status::OK();
 }
@@ -167,35 +179,62 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
 Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& 
stat,
                                      TabletSchemaSPtr flush_schema) {
     SCOPED_ATTACH_TASK(_query_thread_context);
-    io::FileWriter* file_writer = nullptr;
+    size_t segment_file_size = 0;
+    size_t inverted_file_size = 0;
     {
         std::lock_guard lock_guard(_lock);
         DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.uninited_writer", { 
_is_init = false; });
         if (!_is_init) {
             return Status::Corruption("add_segment failed, LoadStreamWriter is 
not inited");
         }
+        if (_inverted_file_writers.size() > 0 &&
+            _inverted_file_writers.size() != _segment_file_writers.size()) {
+            return Status::Corruption(
+                    "add_segment failed, inverted file writer size is {},"
+                    "segment file writer size is {}",
+                    _inverted_file_writers.size(), 
_segment_file_writers.size());
+        }
         DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
                         { segid = _segment_file_writers.size(); });
-        if (segid >= _segment_file_writers.size()) {
-            return Status::Corruption("add_segment failed, segment {} is never 
opened", segid);
+        RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, 
&segment_file_size));
+        if (_inverted_file_writers.size() > 0) {
+            RETURN_IF_ERROR(
+                    _calc_file_size(segid, FileType::INVERTED_INDEX_FILE, 
&inverted_file_size));
         }
-        file_writer = _segment_file_writers[segid].get();
     }
-    DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.null_file_writer", { 
file_writer = nullptr; });
+
+    if (segment_file_size + inverted_file_size != stat.data_size) {
+        return Status::Corruption(
+                "add_segment failed, segment stat {} does not match, file 
size={}, inverted file "
+                "size={}, stat.data_size={}, tablet id={}",
+                segid, segment_file_size, inverted_file_size, stat.data_size, 
_req.tablet_id);
+    }
+
+    return _rowset_writer->add_segment(segid, stat, flush_schema);
+}
+
+Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, 
size_t* file_size) {
+    io::FileWriter* file_writer = nullptr;
+    auto& file_writers =
+            (file_type == FileType::SEGMENT_FILE) ? _segment_file_writers : 
_inverted_file_writers;
+
+    if (segid >= file_writers.size()) {
+        return Status::Corruption("calc file size failed, file {} is never 
opened, file type is {}",
+                                  segid, file_type);
+    }
+    file_writer = file_writers[segid].get();
+    DBUG_EXECUTE_IF("LoadStreamWriter.calc_file_size.null_file_writer", { 
file_writer = nullptr; });
     if (file_writer == nullptr) {
-        return Status::Corruption("add_segment failed, file writer {} is 
destoryed", segid);
+        return Status::Corruption(
+                "calc file size failed, file writer {} is destoryed, file type 
is {}", segid,
+                file_type);
     }
     if (file_writer->state() != io::FileWriter::State::CLOSED) {
-        return Status::Corruption("add_segment failed, segment {} is not 
closed",
+        return Status::Corruption("calc file size failed, file {} is not 
closed",
                                   file_writer->path().native());
     }
-    if (file_writer->bytes_appended() != stat.data_size) {
-        return Status::Corruption(
-                "add_segment failed, segment stat {} does not match, file 
size={}, "
-                "stat.data_size={}",
-                file_writer->path().native(), file_writer->bytes_appended(), 
stat.data_size);
-    }
-    return _rowset_writer->add_segment(segid, stat, flush_schema);
+    *file_size = file_writer->bytes_appended();
+    return Status::OK();
 }
 
 Status LoadStreamWriter::close() {
@@ -224,6 +263,14 @@ Status LoadStreamWriter::close() {
         }
     }
 
+    for (const auto& writer : _inverted_file_writers) {
+        if (writer->state() != io::FileWriter::State::CLOSED) {
+            return Status::Corruption(
+                    "LoadStreamWriter close failed, inverted file {} is not 
closed",
+                    writer->path().native());
+        }
+    }
+
     RETURN_IF_ERROR(_rowset_builder->build_rowset());
     RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
     RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
diff --git a/be/src/runtime/load_stream_writer.h 
b/be/src/runtime/load_stream_writer.h
index 9e3fce3c7db..b22817cb85c 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <gen_cpp/internal_service.pb.h>
+
 #include <atomic>
 #include <memory>
 #include <mutex>
@@ -61,12 +63,15 @@ public:
 
     Status init();
 
-    Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf);
+    Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf,
+                       FileType file_type = FileType::SEGMENT_FILE);
 
-    Status close_segment(uint32_t segid);
+    Status close_writer(uint32_t segid, FileType file_type);
 
     Status add_segment(uint32_t segid, const SegmentStatistics& stat, 
TabletSchemaSPtr flush_chema);
 
+    Status _calc_file_size(uint32_t segid, FileType file_type, size_t* 
file_size);
+
     // wait for all memtables to be flushed.
     Status close();
 
@@ -81,6 +86,7 @@ private:
     std::unordered_map<uint32_t /*segid*/, SegmentStatisticsSharedPtr> 
_segment_stat_map;
     std::mutex _segment_stat_map_lock;
     std::vector<io::FileWriterPtr> _segment_file_writers;
+    std::vector<io::FileWriterPtr> _inverted_file_writers;
     QueryThreadContext _query_thread_context;
 };
 
diff --git a/be/src/util/thrift_util.cpp b/be/src/util/thrift_util.cpp
index 395c01ec390..2efb012aa20 100644
--- a/be/src/util/thrift_util.cpp
+++ b/be/src/util/thrift_util.cpp
@@ -156,7 +156,7 @@ std::string to_string(const TUniqueId& id) {
     return std::to_string(id.hi).append(std::to_string(id.lo));
 }
 
-bool _has_inverted_index_or_partial_update(TOlapTableSink sink) {
+bool _has_inverted_index_v1_or_partial_update(TOlapTableSink sink) {
     OlapTableSchemaParam schema;
     if (!schema.init(sink.schema).ok()) {
         return false;
@@ -167,7 +167,12 @@ bool _has_inverted_index_or_partial_update(TOlapTableSink 
sink) {
     for (const auto& index_schema : schema.indexes()) {
         for (const auto& index : index_schema->indexes) {
             if (index->index_type() == INVERTED) {
-                return true;
+                if (sink.schema.inverted_index_file_storage_format ==
+                    TInvertedIndexFileStorageFormat::V1) {
+                    return true;
+                } else {
+                    return false;
+                }
             }
         }
     }
diff --git a/be/src/util/thrift_util.h b/be/src/util/thrift_util.h
index 9f4792ff64b..a7d6620d5d3 100644
--- a/be/src/util/thrift_util.h
+++ b/be/src/util/thrift_util.h
@@ -177,6 +177,6 @@ bool t_network_address_comparator(const TNetworkAddress& a, 
const TNetworkAddres
 
 PURE std::string to_string(const TUniqueId& id);
 
-PURE bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
+PURE bool _has_inverted_index_v1_or_partial_update(TOlapTableSink sink);
 
 } // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index caebb381db6..63f91678989 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -206,7 +206,7 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
 // APPEND_DATA
 Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
                                    int64_t segment_id, uint64_t offset, 
std::span<const Slice> data,
-                                   bool segment_eos) {
+                                   bool segment_eos, FileType file_type) {
     PStreamHeader header;
     header.set_src_id(_src_id);
     *header.mutable_load_id() = _load_id;
@@ -217,6 +217,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, 
int64_t index_id, int64
     header.set_segment_eos(segment_eos);
     header.set_offset(offset);
     header.set_opcode(doris::PStreamHeader::APPEND_DATA);
+    header.set_file_type(file_type);
     return _encode_and_send(header, data);
 }
 
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index a7d34ff8569..dd15eb7bf4c 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -137,7 +137,7 @@ public:
             Status
             append_data(int64_t partition_id, int64_t index_id, int64_t 
tablet_id,
                         int64_t segment_id, uint64_t offset, std::span<const 
Slice> data,
-                        bool segment_eos = false);
+                        bool segment_eos = false, FileType file_type = 
FileType::SEGMENT_FILE);
 
     // ADD_SEGMENT
     Status add_segment(int64_t partition_id, int64_t index_id, int64_t 
tablet_id,
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp 
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index b9b0e0818cf..69f286b205b 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -60,7 +60,8 @@ class StreamSinkFileWriterTest : public testing::Test {
         // APPEND_DATA
         virtual Status append_data(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
                                    int64_t segment_id, uint64_t offset, 
std::span<const Slice> data,
-                                   bool segment_eos = false) override {
+                                   bool segment_eos = false,
+                                   FileType file_type = 
FileType::SEGMENT_FILE) override {
             EXPECT_EQ(PARTITION_ID, partition_id);
             EXPECT_EQ(INDEX_ID, index_id);
             EXPECT_EQ(TABLET_ID, tablet_id);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 8c40e467338..996f9d2fc1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -327,6 +327,7 @@ public class OlapTableSink extends DataSink {
                 }
             }
         }
+        
schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat());
         return schemaParam;
     }
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 9d8a72e01cc..4457c50917b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -24,6 +24,7 @@ option java_package = "org.apache.doris.proto";
 import "data.proto";
 import "descriptors.proto";
 import "types.proto";
+import "olap_common.proto";
 import "olap_file.proto";
 
 option cc_generic_services = true;
@@ -909,6 +910,7 @@ message PStreamHeader {
     repeated PTabletID tablets = 10;
     optional TabletSchemaPB flush_schema = 11;
     optional uint64 offset = 12;
+    optional FileType file_type = 13;
 }
 
 message PGetWalQueueSizeRequest{
diff --git a/gensrc/proto/olap_common.proto b/gensrc/proto/olap_common.proto
index a452e0ff6a6..e60aa7603fc 100644
--- a/gensrc/proto/olap_common.proto
+++ b/gensrc/proto/olap_common.proto
@@ -58,3 +58,8 @@ message PTopNCounter {
     required uint32 space_expand_rate = 2;
     repeated PCounter counter = 3;
 }
+
+enum FileType {
+  SEGMENT_FILE = 1;
+  INVERTED_INDEX_FILE = 2;
+}
\ No newline at end of file
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 2b8a74afd66..cb844c93361 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -249,6 +249,7 @@ struct TOlapTableSchemaParam {
     10: optional bool is_strict_mode = false
     11: optional string auto_increment_column
     12: optional i32 auto_increment_column_unique_id = -1
+    13: optional Types.TInvertedIndexFileStorageFormat 
inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1
 }
 
 struct TTabletLocation {
diff --git a/regression-test/data/inverted_index_p0/load/test_insert.out 
b/regression-test/data/inverted_index_p0/load/test_insert.out
new file mode 100644
index 00000000000..b8f7f12afbc
--- /dev/null
+++ b/regression-test/data/inverted_index_p0/load/test_insert.out
@@ -0,0 +1,73 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_2 --
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2      {"a":18811,"b":"hello world","c":1181111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2      {"a":18811,"b":"hello world","c":1181111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2      {"a":18811,"b":"hello world","c":1181111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2      {"a":18811,"b":"hello world","c":1181111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2      {"a":18811,"b":"hello world","c":1181111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2      {"a":18811,"b":"hello world","c":1181111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2      {"a":18811,"b":"hello world","c":1181111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_2 --
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2      {"a":18811,"b":"hello world","c":1181111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
diff --git a/regression-test/data/inverted_index_p0/load/test_stream_load.out 
b/regression-test/data/inverted_index_p0/load/test_stream_load.out
new file mode 100644
index 00000000000..5723e4218d2
--- /dev/null
+++ b/regression-test/data/inverted_index_p0/load/test_stream_load.out
@@ -0,0 +1,45 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_1 --
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+
+-- !sql_1 --
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+
diff --git 
a/regression-test/data/fault_injection_p0/test_index_lowercase_fault_injection.out
 
b/regression-test/data/inverted_index_p0/test_index_lowercase_fault_injection.out
similarity index 100%
rename from 
regression-test/data/fault_injection_p0/test_index_lowercase_fault_injection.out
rename to 
regression-test/data/inverted_index_p0/test_index_lowercase_fault_injection.out
diff --git 
a/regression-test/data/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.out
 
b/regression-test/data/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.out
new file mode 100644
index 00000000000..5c6d903a9b4
--- /dev/null
+++ 
b/regression-test/data/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_1 --
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+
+-- !sql_2 --
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2      {"a":18811,"b":"hello world","c":1181111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_1 --
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+davidjhulse/davesbingrewardsbot
+
+-- !sql_2 --
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
+-- !sql_3 --
+2      {"a":18811,"b":"hello world","c":1181111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+
diff --git 
a/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
index ea9e9ffb8bb..fb04b128822 100644
--- 
a/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
@@ -96,8 +96,11 @@ suite("test_delta_writer_v2_back_pressure_fault_injection", 
"nonConcurrent") {
                 logger.info(res.toString())
             }
         }
+
     } catch(Exception e) {
         logger.info(e.getMessage())
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure")
     }
 
     sql """ DROP TABLE IF EXISTS `baseall` """
diff --git 
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
index 65e68f3d3fa..6a6aa0efd43 100644
--- 
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
@@ -143,22 +143,22 @@ suite("load_stream_fault_injection", "nonConcurrent") {
     load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", 
"")
     // LoadStreamWriter append_data meet null file writer error
     load_with_injection("LoadStreamWriter.append_data.null_file_writer", "")
-    // LoadStreamWriter close_segment meet not inited error
-    load_with_injection("LoadStreamWriter.close_segment.uninited_writer", "")
-    // LoadStreamWriter close_segment meet not bad segid error
-    load_with_injection("LoadStreamWriter.close_segment.bad_segid", "")
-    // LoadStreamWriter close_segment meet null file writer error
-    load_with_injection("LoadStreamWriter.close_segment.null_file_writer", "")
-    // LoadStreamWriter close_segment meet file writer failed to close error
+    // LoadStreamWriter close_writer meet not inited error
+    load_with_injection("LoadStreamWriter.close_writer.uninited_writer", "")
+    // LoadStreamWriter close_writer meet not bad segid error
+    load_with_injection("LoadStreamWriter.close_writer.bad_segid", "")
+    // LoadStreamWriter close_writer meet null file writer error
+    load_with_injection("LoadStreamWriter.close_writer.null_file_writer", "")
+    // LoadStreamWriter close_writer meet file writer failed to close error
     load_with_injection("LocalFileWriter.close.failed", "")
-    // LoadStreamWriter close_segment meet bytes_appended and real file size 
not match error
-    load_with_injection("FileWriter.close_segment.zero_bytes_appended", "")
+    // LoadStreamWriter close_writer meet bytes_appended and real file size 
not match error
+    load_with_injection("FileWriter.close_writer.zero_bytes_appended", "")
     // LoadStreamWriter add_segment meet not inited error
     load_with_injection("LoadStreamWriter.add_segment.uninited_writer", "")
     // LoadStreamWriter add_segment meet not bad segid error
     load_with_injection("LoadStreamWriter.add_segment.bad_segid", "")
     // LoadStreamWriter add_segment meet null file writer error
-    load_with_injection("LoadStreamWriter.add_segment.null_file_writer", "")
+    load_with_injection("LoadStreamWriter.calc_file_size.null_file_writer", "")
     // LoadStreamWriter add_segment meet bytes_appended and real file size not 
match error
     load_with_injection("FileWriter.add_segment.zero_bytes_appended", "")
     // LoadStream init failed coz LoadStreamWriter init failed
diff --git a/regression-test/suites/inverted_index_p0/load/test_insert.groovy 
b/regression-test/suites/inverted_index_p0/load/test_insert.groovy
new file mode 100644
index 00000000000..97b3ca07937
--- /dev/null
+++ b/regression-test/suites/inverted_index_p0/load/test_insert.groovy
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_insert_with_index", "p0") {
+
+     def set_be_config = { key, value ->
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+    def test = { format -> 
+        def srcName = "src_table"
+        def dstName = "dst_table"
+        sql """ DROP TABLE IF EXISTS ${srcName}; """
+        sql """
+            CREATE TABLE ${srcName} (
+                k bigint,
+                v variant,
+                INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" = 
"english") COMMENT ''
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k`) BUCKETS 2
+            PROPERTIES ( "replication_num" = "1", 
"inverted_index_storage_format" = ${format});
+        """
+
+        sql """insert into ${srcName} values(1, '{"a" : 123, "b" : "xxxyyy", 
"c" : 111999111}')"""
+        sql """insert into ${srcName} values(2, '{"a" : 18811, "b" : "hello 
world", "c" : 1181111}')"""
+        sql """insert into ${srcName} values(3, '{"a" : 18811, "b" : "hello 
wworld", "c" : 11111}')"""
+        sql """insert into ${srcName} values(4, '{"a" : 1234, "b" : "hello xxx 
world", "c" : 8181111}')"""
+        qt_sql_2 """select * from ${srcName} where cast(v["a"] as smallint) > 
123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 1024 
order by k"""
+        sql """insert into ${srcName} values(5, '{"a" : 123456789, "b" : 
123456, "c" : 8181111}')"""
+        qt_sql_3 """select * from ${srcName} where cast(v["a"] as int) > 123 
and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 order 
by k"""
+
+        sql """ DROP TABLE IF EXISTS ${dstName}; """
+        sql """
+            CREATE TABLE ${dstName} (
+                k bigint,
+                v variant,
+                INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" = 
"english") COMMENT ''
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k`) BUCKETS 2
+            PROPERTIES ( "replication_num" = "1", 
"inverted_index_storage_format" = ${format});
+        """
+        sql """ insert into ${dstName} select * from ${srcName}"""
+        qt_sql_2 """select * from ${srcName} where cast(v["a"] as smallint) > 
123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 1024 
order by k"""
+        qt_sql_3 """select * from ${srcName} where cast(v["a"] as int) > 123 
and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 order 
by k"""
+        sql """ DROP TABLE IF EXISTS ${dstName}; """
+        sql """ DROP TABLE IF EXISTS ${srcName}; """
+    }
+
+    set_be_config("inverted_index_ram_dir_enable", "true")
+    test.call("V1")
+    test.call("V2")
+    set_be_config("inverted_index_ram_dir_enable", "false")
+    test.call("V1")
+    test.call("V2")
+    set_be_config("inverted_index_ram_dir_enable", "true")
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/inverted_index_p0/load/test_spark_load.groovy 
b/regression-test/suites/inverted_index_p0/load/test_spark_load.groovy
new file mode 100644
index 00000000000..0fd0ca35627
--- /dev/null
+++ b/regression-test/suites/inverted_index_p0/load/test_spark_load.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_spark_load_with_index_p0", "p0") {
+
+    def set_be_config = { key, value ->
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+
+    def test = { format ->
+        // Need spark cluster, upload data file to hdfs
+        def testTable = "tbl_test_spark_load"
+        def testTable2 = "tbl_test_spark_load2"
+        def testResource = "spark_resource"
+        def yarnAddress = "master:8032"
+        def hdfsAddress = "hdfs://master:9000"
+        def hdfsWorkingDir = "hdfs://master:9000/doris"
+        brokerName =getBrokerName()
+        hdfsUser = getHdfsUser()
+        hdfsPasswd = getHdfsPasswd()
+        
+        def create_test_table = {testTablex ->
+            def result1 = sql """
+                CREATE TABLE IF NOT EXISTS ${testTablex} (
+                    c_int int(11) NULL,
+                    c_char char(15) NULL,
+                    c_varchar varchar(100) NULL,
+                    c_bool boolean NULL,
+                    c_tinyint tinyint(4) NULL,
+                    c_smallint smallint(6) NULL,
+                    c_bigint bigint(20) NULL,
+                    c_largeint largeint(40) NULL,
+                    c_float float NULL,
+                    c_double double NULL,
+                    c_decimal decimal(6, 3) NULL,
+                    c_decimalv3 decimal(6, 3) NULL,
+                    c_date date NULL,
+                    c_datev2 date NULL,
+                    c_datetime datetime NULL,
+                    c_datetimev2 datetime NULL,
+                    INDEX idx_c_varchar(c_varchar) USING INVERTED,
+                    INDEX idx_c_datetime(c_datetime) USING INVERTED
+                )
+                DISTRIBUTED BY HASH(c_int) BUCKETS 1
+                PROPERTIES (
+                "replication_num" = "1",
+                "inverted_index_storage_format" = ${format}
+                )
+                """
+            assertTrue(result1.size() == 1)
+            assertTrue(result1[0].size() == 1)
+            assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
+        }
+
+        def create_spark_resource = {sparkType, sparkMaster, sparkQueue ->
+            def result1 = sql """
+                CREATE EXTERNAL RESOURCE "${testResource}"
+                PROPERTIES
+                (
+                    "type" = "spark",
+                    "spark.master" = "yarn",
+                    "spark.submit.deployMode" = "cluster",
+                    "spark.executor.memory" = "1g",
+                    "spark.yarn.queue" = "default",
+                    "spark.hadoop.yarn.resourcemanager.address" = 
"${yarnAddress}",
+                    "spark.hadoop.fs.defaultFS" = "${hdfsAddress}",
+                    "working_dir" = "${hdfsWorkingDir}",
+                    "broker" = "${brokerName}",
+                    "broker.username" = "${hdfsUser}",
+                    "broker.password" = "${hdfsPasswd}"
+                );
+                """
+            
+            // DDL/DML return 1 row and 3 column, the only value is update row 
count
+            assertTrue(result1.size() == 1)
+            assertTrue(result1[0].size() == 1)
+            assertTrue(result1[0][0] == 0, "Create resource should update 0 
rows")
+        }
+
+        def load_from_hdfs_use_spark = {testTablex, testTablex2, label, 
hdfsFilePath1, hdfsFilePath2 ->
+            def result1= sql """
+                            LOAD LABEL ${label}
+                            (
+                                DATA INFILE("${hdfsFilePath1}")
+                                INTO TABLE ${testTablex}
+                                COLUMNS TERMINATED BY ",",
+                                DATA INFILE("${hdfsFilePath2}")
+                                INTO TABLE ${testTablex2}
+                                COLUMNS TERMINATED BY "|"
+                            )
+                            WITH RESOURCE '${testResource}'
+                            (
+                                "spark.executor.memory" = "2g",
+                                "spark.shuffle.compress" = "true"
+                            )
+                            PROPERTIES
+                            (
+                                "timeout" = "3600"
+                            );
+                            """
+            
+            assertTrue(result1.size() == 1)
+            assertTrue(result1[0].size() == 1)
+            assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
+        }
+        
+        def check_load_result = {checklabel, testTablex, testTablex2 ->
+            max_try_milli_secs = 10000
+            while(max_try_milli_secs) {
+                result = sql "show load where label = '${checklabel}'"
+                if(result[0][2] == "FINISHED") {
+                    sql "sync"
+                    qt_select "select * from ${testTablex} order by c_int"
+                    qt_select "select * from ${testTablex2} order by c_int"
+                    break
+                } else {
+                    sleep(1000) // wait 1 second every time
+                    max_try_milli_secs -= 1000
+                    if(max_try_milli_secs <= 0) {
+                        assertEquals(1, 2)
+                    }
+                }
+            }
+        }
+
+        // if 'enableHdfs' in regression-conf.groovy has been set to true,
+        if (enableHdfs()) {
+            def hdfs_txt_file_path1 = uploadToHdfs 
"load_p0/spark_load/all_types1.txt"
+            def hdfs_txt_file_path2 = uploadToHdfs 
"load_p0/spark_load/all_types2.txt"
+            try {
+                sql "DROP TABLE IF EXISTS ${testTable}"
+                sql "DROP TABLE IF EXISTS ${testTable2}"
+                create_test_table.call(testTable)
+                create_test_table.call(testTable2)
+                def test_load_label = 
UUID.randomUUID().toString().replaceAll("-", "")
+                load_from_hdfs.call(testTable, testTable2, test_load_label, 
hdfs_txt_file_path1, hdfs_txt_file_path2)
+                check_load_result.call(test_load_label, testTable, testTable2)
+
+            } finally {
+                try_sql("DROP TABLE IF EXISTS ${testTable}")
+                try_sql("DROP TABLE IF EXISTS ${testTable2}")
+            }
+        }
+    }
+    
+    set_be_config("inverted_index_ram_dir_enable", "true")
+    test.call("V1")
+    test.call("V2")
+    set_be_config("inverted_index_ram_dir_enable", "false")
+    test.call("V1")
+    test.call("V2")
+    set_be_config("inverted_index_ram_dir_enable", "true")
+}
diff --git 
a/regression-test/suites/inverted_index_p0/load/test_stream_load.groovy 
b/regression-test/suites/inverted_index_p0/load/test_stream_load.groovy
new file mode 100644
index 00000000000..f29ff3b3512
--- /dev/null
+++ b/regression-test/suites/inverted_index_p0/load/test_stream_load.groovy
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_stream_load_with_inverted_index_p0", "nonCurrent") {
+
+    def set_be_config = { key, value ->
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+
+    def tableName = "test_stream_load_with_inverted_index"
+    def calc_file_crc_on_tablet = { ip, port, tablet ->
+        return curl("GET", 
String.format("http://%s:%s/api/calc_crc?tablet_id=%s";, ip, port, tablet))
+    }
+
+    def load_json_data = {table_name, file_name ->
+        // load the json data
+        streamLoad {
+            table "${table_name}"
+
+            // set http request header params
+            set 'read_json_by_line', 'true' 
+            set 'format', 'json' 
+            set 'max_filter_ratio', '0.1'
+            set 'memtable_on_sink_node', 'true'
+            file file_name // import json file
+            time 10000 // limit inflight 10s
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                        throw exception
+                }
+                logger.info("Stream load ${file_name} result: 
${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + 
json.NumberUnselectedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+    }
+
+    boolean disableAutoCompaction = true
+    boolean has_update_be_config = false
+    try {
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+
+        logger.info("Show config: code=" + code + ", out=" + out + ", err=" + 
err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                disableAutoCompaction = Boolean.parseBoolean(((List<String>) 
ele)[2])
+            }
+        }
+        set_be_config.call("disable_auto_compaction", "true")
+        has_update_be_config = true
+
+        def test = { format -> 
+            sql """ DROP TABLE IF EXISTS ${tableName}; """
+            sql """
+                CREATE TABLE ${tableName} (
+                    k bigint,
+                    v variant,
+                    INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" = 
"english") COMMENT ''
+                ) ENGINE=OLAP
+                DUPLICATE KEY(`k`)
+                COMMENT 'OLAP'
+                DISTRIBUTED BY HASH(`k`) BUCKETS 10
+                PROPERTIES ( "replication_num" = "1", 
"inverted_index_storage_format" = ${format});
+            """
+
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            
+
+            def tablets = sql_return_maparray """ show tablets from 
${tableName}; """
+
+            for (def tablet in tablets) {
+                String tablet_id = tablet.TabletId
+                backend_id = tablet.BackendId
+                (code, out, err) = 
calc_file_crc_on_tablet(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                logger.info("Run calc file: code=" + code + ", out=" + out + 
", err=" + err)
+                assertEquals(code, 0)
+                def resultJson = parseJson(out.trim())
+                assertEquals(resultJson.start_version, "0")
+                assertEquals(resultJson.end_version, "11")
+                assertEquals(resultJson.rowset_count, "11")
+            }
+            qt_sql_1 """
+            select cast(v["repo"]["name"] as string) from ${tableName} where 
cast(v["repo"]["name"] as string) match_phrase_prefix "davesbingrewardsbot";
+            """
+
+            sql """ DROP TABLE IF EXISTS ${tableName}; """
+        }
+
+        set_be_config("inverted_index_ram_dir_enable", "true")
+        // test.call("V1")
+        test.call("V2")
+        set_be_config("inverted_index_ram_dir_enable", "false")
+        // test.call("V1")
+        test.call("V2")
+        set_be_config("inverted_index_ram_dir_enable", "true")
+        
+    } finally {
+        if (has_update_be_config) {
+            set_be_config.call("disable_auto_compaction", 
disableAutoCompaction.toString())
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/fault_injection_p0/test_index_lowercase_fault_injection.groovy
 
b/regression-test/suites/inverted_index_p0/test_index_lowercase_fault_injection.groovy
similarity index 99%
rename from 
regression-test/suites/fault_injection_p0/test_index_lowercase_fault_injection.groovy
rename to 
regression-test/suites/inverted_index_p0/test_index_lowercase_fault_injection.groovy
index e18060a7d68..2ed2a04a93b 100644
--- 
a/regression-test/suites/fault_injection_p0/test_index_lowercase_fault_injection.groovy
+++ 
b/regression-test/suites/inverted_index_p0/test_index_lowercase_fault_injection.groovy
@@ -77,4 +77,4 @@ suite("test_index_lowercase_fault_injection", 
"nonConcurrent") {
       qt_sql """ select count() from ${testTable} where (request match 
'http');  """
     } finally {
     }
-}
\ No newline at end of file
+ }
diff --git 
a/regression-test/suites/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.groovy
 
b/regression-test/suites/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.groovy
new file mode 100644
index 00000000000..a68870e80d6
--- /dev/null
+++ 
b/regression-test/suites/inverted_index_p2/load_with_inverted_index_p2/test_stream_load_with_inverted_index.groovy
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_stream_load_with_inverted_index", "p2") {
+    def tableName = "test_stream_load_with_inverted_index"
+
+    def set_be_config = { key, value ->
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+
+    def calc_file_crc_on_tablet = { ip, port, tablet ->
+        return curl("GET", 
String.format("http://%s:%s/api/calc_crc?tablet_id=%s";, ip, port, tablet))
+    }
+
+    def load_json_data = {table_name, file_name ->
+        // load the json data
+        streamLoad {
+            table "${table_name}"
+
+            // set http request header params
+            set 'read_json_by_line', 'true' 
+            set 'format', 'json' 
+            set 'max_filter_ratio', '0.1'
+            set 'memtable_on_sink_node', 'true'
+            file file_name // import json file
+            time 10000 // limit inflight 10s
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                        throw exception
+                }
+                logger.info("Stream load ${file_name} result: 
${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + 
json.NumberUnselectedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+    }
+
+    boolean disableAutoCompaction = true
+    boolean has_update_be_config = false
+    try {
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+
+        logger.info("Show config: code=" + code + ", out=" + out + ", err=" + 
err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                disableAutoCompaction = Boolean.parseBoolean(((List<String>) 
ele)[2])
+            }
+        }
+        set_be_config.call("disable_auto_compaction", "true")
+        has_update_be_config = true
+
+        def test = { format ->
+            sql """ DROP TABLE IF EXISTS ${tableName}; """
+            sql """
+                CREATE TABLE ${tableName} (
+                    k bigint,
+                    v variant,
+                    INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" = 
"english") COMMENT ''
+                ) ENGINE=OLAP
+                DUPLICATE KEY(`k`)
+                COMMENT 'OLAP'
+                DISTRIBUTED BY HASH(`k`) BUCKETS 1
+                PROPERTIES ( "replication_num" = "2", 
"inverted_index_storage_format" = ${format});
+            """
+
+            def tablets = sql_return_maparray """ show tablets from 
${tableName}; """
+
+            String first_backend_id;
+            List<String> other_backend_id = new ArrayList<>()
+        
+            String tablet_id = tablets[0].TabletId
+            def tablet_info = sql_return_maparray """ show tablet 
${tablet_id}; """
+            logger.info("tablet: " + tablet_info)
+            for (def tablet in tablets) {
+                first_backend_id = tablet.BackendId
+                other_backend_id.add(tablet.BackendId)
+            }
+            other_backend_id.remove(first_backend_id)
+
+            def checkTabletFileCrc = {
+                def (first_code, first_out, first_err) = 
calc_file_crc_on_tablet(backendId_to_backendIP[first_backend_id], 
backendId_to_backendHttpPort[first_backend_id], tablet_id)
+                logger.info("Run calc_file_crc_on_tablet: ip=" + 
backendId_to_backendIP[first_backend_id] + " code=" + first_code + ", out=" + 
first_out + ", err=" + first_err)
+
+                for (String backend: other_backend_id) {
+                    def (other_code, other_out, other_err) = 
calc_file_crc_on_tablet(backendId_to_backendIP[backend], 
backendId_to_backendHttpPort[backend], tablet_id)
+                    logger.info("Run calc_file_crc_on_tablet: ip=" + 
backendId_to_backendIP[backend] + " code=" + other_code + ", out=" + other_out 
+ ", err=" + other_err)
+                    assertTrue(parseJson(first_out.trim()).crc_value == 
parseJson(other_out.trim()).crc_value)
+                    assertTrue(parseJson(first_out.trim()).start_version == 
parseJson(other_out.trim()).start_version)
+                    assertTrue(parseJson(first_out.trim()).end_version == 
parseJson(other_out.trim()).end_version)
+                    assertTrue(parseJson(first_out.trim()).file_count == 
parseJson(other_out.trim()).file_count)
+                    assertTrue(parseJson(first_out.trim()).rowset_count == 
parseJson(other_out.trim()).rowset_count)
+                }
+            }
+
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+            load_json_data.call(tableName, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+
+
+            // check 
+            checkTabletFileCrc.call()
+
+            qt_sql_1 """
+            select cast(v["repo"]["name"] as string) from ${tableName} where 
cast(v["repo"]["name"] as string) match "davesbingrewardsbot";
+            """
+
+            sql """ DROP TABLE IF EXISTS ${tableName}; """
+            sql """
+                CREATE TABLE ${tableName} (
+                    k bigint,
+                    v variant,
+                    INDEX idx_v (`v`) USING INVERTED PROPERTIES("parser" = 
"english") COMMENT ''
+                ) ENGINE=OLAP
+                DUPLICATE KEY(`k`)
+                COMMENT 'OLAP'
+                DISTRIBUTED BY HASH(`k`) BUCKETS 1
+                PROPERTIES ( "replication_num" = "2", 
"inverted_index_storage_format" = ${format});
+            """
+
+            sql """insert into ${tableName} values(1, '{"a" : 123, "b" : 
"xxxyyy", "c" : 111999111}')"""
+            sql """insert into ${tableName} values(2, '{"a" : 18811, "b" : 
"hello world", "c" : 1181111}')"""
+            sql """insert into ${tableName} values(3, '{"a" : 18811, "b" : 
"hello wworld", "c" : 11111}')"""
+            sql """insert into ${tableName} values(4, '{"a" : 1234, "b" : 
"hello xxx world", "c" : 8181111}')"""
+            qt_sql_2 """select * from ${tableName} where cast(v["a"] as 
smallint) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as 
int) > 1024 order by k"""
+            sql """insert into ${tableName} values(5, '{"a" : 123456789, "b" : 
123456, "c" : 8181111}')"""
+            qt_sql_3 """select * from ${tableName} where cast(v["a"] as int) > 
123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 
order by k"""
+
+            // check 
+            checkTabletFileCrc.call()
+            sql """ DROP TABLE IF EXISTS ${tableName}; """
+        }
+        set_be_config("inverted_index_ram_dir_enable", "true")
+        // test.call("V1")
+        test.call("V2")
+        set_be_config("inverted_index_ram_dir_enable", "false")
+        // test.call("V1")
+        test.call("V2")
+        set_be_config("inverted_index_ram_dir_enable", "true")
+    } finally {
+        if (has_update_be_config) {
+            set_be_config.call("disable_auto_compaction", 
disableAutoCompaction.toString())
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/inverted_index_p2/test_insert_into_index.groovy 
b/regression-test/suites/inverted_index_p2/test_insert_into_index.groovy
new file mode 100644
index 00000000000..055cff66c09
--- /dev/null
+++ b/regression-test/suites/inverted_index_p2/test_insert_into_index.groovy
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_into_with_inverted_index", "p2"){
+    def src_table = "srcTable"
+    def dst_table = "dstTable"
+
+    sql """
+    CREATE TABLE IF NOT EXISTS ${src_table} (
+            k bigint,
+            v variant
+        )
+        DUPLICATE KEY(`k`)
+        DISTRIBUTED BY HASH(k) BUCKETS 2
+        properties("replication_num" = "1", "disable_auto_compaction" = 
"true");
+    """
+
+    sql """
+    CREATE TABLE IF NOT EXISTS ${dst_table} (
+            k bigint,
+            v variant,
+            INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") 
COMMENT ''
+        )
+        DUPLICATE KEY(`k`)
+        DISTRIBUTED BY HASH(k) BUCKETS 2
+        properties("replication_num" = "1", "disable_auto_compaction" = 
"true");
+    """
+
+    def load_json_data = {table_name, file_name ->
+        // load the json data
+        streamLoad {
+            table "${table_name}"
+
+            // set http request header params
+            set 'read_json_by_line', 'true' 
+            set 'format', 'json' 
+            set 'max_filter_ratio', '0.1'
+            file file_name // import json file
+            time 10000 // limit inflight 10s
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                        throw exception
+                }
+                logger.info("Stream load ${file_name} result: 
${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + 
json.NumberUnselectedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+    }
+    for (int i = 1; i <= 100; i++) {
+        load_json_data.call(src_table, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+    }
+    
+    sql """ insert into ${dst_table} select * from ${src_table}"""
+}
diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy 
b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
index 5411224c200..77114904f18 100644
--- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
+++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
@@ -34,7 +34,9 @@ suite("test_http_stream", "p0") {
             dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
             dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
             dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
-            dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+            dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP,
+            INDEX idx_dt_2 (`dt_2`) USING INVERTED,
+            INDEX idx_dt_3 (`dt_3`) USING INVERTED
         )
         DISTRIBUTED BY HASH(id) BUCKETS 1
         PROPERTIES (
@@ -298,7 +300,9 @@ suite("test_http_stream", "p0") {
             sex TINYINT,
             phone LARGEINT,
             address VARCHAR(500),
-            register_time DATETIME
+            register_time DATETIME,
+            INDEX idx_username (`username`) USING INVERTED,
+            INDEX idx_address (`address`) USING INVERTED
         )
         DUPLICATE KEY(`user_id`, `username`)
         DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
diff --git a/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy 
b/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy
index ff239e5fef1..9eb948bf55a 100644
--- a/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy
+++ b/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy
@@ -36,7 +36,9 @@ suite("test_mysql_load", "p0") {
             `v9` date REPLACE_IF_NOT_NULL NULL,
             `v10` char(10) REPLACE_IF_NOT_NULL NULL,
             `v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
-            `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+            `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL,
+            INDEX idx_k1 (`k1`) USING INVERTED,
+            INDEX idx_k2 (`k2`) USING INVERTED
         ) ENGINE=OLAP
         AGGREGATE KEY(`k1`, `k2`)
         COMMENT 'OLAP'
diff --git 
a/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy 
b/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy
index b63c8711301..79c3fecd69e 100644
--- a/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy
+++ b/regression-test/suites/load_p0/mysql_load/test_mysql_load_big_file.groovy
@@ -28,7 +28,9 @@ suite("test_mysql_load_big_file", "p0") {
             `v1` tinyint(4)  NULL,
             `v2` string  NULL,
             `v3` date  NULL,
-            `v4` datetime  NULL
+            `v4` datetime  NULL,
+            INDEX idx_v2 (`v2`) USING INVERTED,
+            INDEX idx_v3 (`v3`) USING INVERTED
         ) ENGINE=OLAP
         DUPLICATE KEY(`k1`, `k2`)
         COMMENT 'OLAP'


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to