This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b0519d29efa [fix](inverted index) Writing to the inverted index also 
writes to the file cache. (#39076)
b0519d29efa is described below

commit b0519d29efa43801ae5dd5dfa1e3849632bf731a
Author: zzzxl <33418555+zzzxl1...@users.noreply.github.com>
AuthorDate: Tue Aug 13 16:38:55 2024 +0800

    [fix](inverted index) Writing to the inverted index also writes to the file 
cache. (#39076)
    
    1. When write_file_cache is true, writing to the inverted index also
    writes to the file cache.
---
 be/src/olap/compaction.cpp                         |   3 +
 be/src/olap/rowset/beta_rowset_writer.cpp          |   8 +-
 be/src/olap/rowset/rowset_writer_context.h         |  10 ++
 .../segment_v2/inverted_index_file_writer.cpp      |   3 +
 .../rowset/segment_v2/inverted_index_file_writer.h |   3 +
 .../segment_v2/inverted_index_fs_directory.cpp     |  18 +++-
 .../segment_v2/inverted_index_fs_directory.h       |   9 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |   2 +
 .../rowset/segment_v2/vertical_segment_writer.cpp  |   2 +
 be/src/olap/rowset/vertical_beta_rowset_writer.cpp |   9 +-
 .../test_index_writer_file_cache.groovy            | 116 +++++++++++++++++++++
 11 files changed, 162 insertions(+), 21 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 8c109eec1c1..9ed27bad382 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -686,6 +686,9 @@ Status Compaction::do_inverted_index_compaction() {
                        << st;
             return st;
         }
+        for (const auto& writer : inverted_index_file_writers) {
+            writer->set_file_writer_opts(ctx.get_file_writer_options());
+        }
     }
 
     // use tmp file dir to store index files
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index f3a0ade24f3..ec1bba7621b 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -846,13 +846,7 @@ Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& 
rowset_ptr) {
 
 Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path,
                                                  io::FileWriterPtr& 
file_writer) {
-    io::FileWriterOptions opts {
-            .write_file_cache = _context.write_file_cache,
-            .is_cold_data = _context.is_hot_data,
-            .file_cache_expiration =
-                    _context.file_cache_ttl_sec > 0 && 
_context.newest_write_timestamp > 0
-                            ? _context.newest_write_timestamp + 
_context.file_cache_ttl_sec
-                            : 0};
+    io::FileWriterOptions opts = _context.get_file_writer_options();
     Status st = _context.fs()->create_file(path, &file_writer, &opts);
     if (!st.ok()) {
         LOG(WARNING) << "failed to create writable file. path=" << path << ", 
err: " << st;
diff --git a/be/src/olap/rowset/rowset_writer_context.h 
b/be/src/olap/rowset/rowset_writer_context.h
index 0130916bfb4..e13f7efe6e9 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -140,6 +140,16 @@ struct RowsetWriterContext {
             return *storage_resource->fs;
         }
     }
+
+    io::FileWriterOptions get_file_writer_options() const {
+        io::FileWriterOptions opts {
+                .write_file_cache = write_file_cache,
+                .is_cold_data = is_hot_data,
+                .file_cache_expiration = file_cache_ttl_sec > 0 && 
newest_write_timestamp > 0
+                                                 ? newest_write_timestamp + 
file_cache_ttl_sec
+                                                 : 0};
+        return opts;
+    }
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index f2ac0e92265..6eb54878924 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -283,6 +283,7 @@ size_t InvertedIndexFileWriter::write_v1() {
             ram_dir.close();
 
             auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, 
idx_path.c_str());
+            out_dir->set_file_writer_opts(_opts);
 
             auto* out = out_dir->createOutput(idx_name.c_str());
             if (out == nullptr) {
@@ -348,6 +349,8 @@ size_t InvertedIndexFileWriter::write_v2() {
     io::Path index_path 
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
 
     auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, 
index_path.parent_path().c_str());
+    out_dir->set_file_writer_opts(_opts);
+
     std::unique_ptr<lucene::store::IndexOutput> compound_file_output;
     // idx v2 writer != nullptr means memtable on sink node now
     if (_idx_v2_writer != nullptr) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h 
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
index b9f9b983e44..024c1dec986 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
@@ -71,6 +71,8 @@ public:
                   lucene::store::IndexOutput* output, uint8_t* buffer, int64_t 
bufferLength);
     InvertedIndexStorageFormatPB get_storage_format() const { return 
_storage_format; }
 
+    void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts = 
opts; }
+
 private:
     InvertedIndexDirectoryMap _indices_dirs;
     const io::FileSystemSPtr _fs;
@@ -81,6 +83,7 @@ private:
     size_t _file_size = 0;
     // write to disk or stream
     io::FileWriterPtr _idx_v2_writer;
+    io::FileWriterOptions _opts;
 };
 } // namespace segment_v2
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 0443bf345ba..27e03b43da2 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -84,9 +84,6 @@ namespace doris::segment_v2 {
 const char* const DorisFSDirectory::WRITE_LOCK_FILE = "write.lock";
 
 class DorisFSDirectory::FSIndexOutput : public 
lucene::store::BufferedIndexOutput {
-private:
-    io::FileWriterPtr _writer;
-
 protected:
     void flushBuffer(const uint8_t* b, const int32_t size) override;
 
@@ -96,6 +93,12 @@ public:
     ~FSIndexOutput() override;
     void close() override;
     int64_t length() const override;
+
+    void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts = 
opts; }
+
+private:
+    io::FileWriterPtr _writer;
+    io::FileWriterOptions _opts;
 };
 
 class DorisFSDirectory::FSIndexOutputV2 : public 
lucene::store::BufferedIndexOutput {
@@ -242,7 +245,13 @@ void DorisFSDirectory::FSIndexInput::readInternal(uint8_t* 
b, const int32_t len)
 }
 
 void DorisFSDirectory::FSIndexOutput::init(const io::FileSystemSPtr& fs, const 
char* path) {
-    Status status = fs->create_file(path, &_writer);
+    DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput::init.file_cache", {
+        if (fs->type() == io::FileSystemType::S3 && _opts.write_file_cache == 
false) {
+            _CLTHROWA(CL_ERR_IO, "Inverted index failed to enter file cache");
+        }
+    });
+
+    Status status = fs->create_file(path, &_writer, &_opts);
     DBUG_EXECUTE_IF(
             
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_"
             "init",
@@ -579,6 +588,7 @@ lucene::store::IndexOutput* 
DorisFSDirectory::createOutput(const char* name) {
         assert(!exists);
     }
     auto* ret = _CLNEW FSIndexOutput();
+    ret->set_file_writer_opts(_opts);
     try {
         ret->init(_fs, fl);
     } catch (CLuceneError& err) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h 
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
index b3e0352d7ad..357ac65c678 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
@@ -29,6 +29,7 @@
 #include "CLucene/SharedHeader.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
 #include "io/io_common.h"
 
 class CLuceneError;
@@ -46,8 +47,6 @@ class CLUCENE_EXPORT DorisFSDirectory : public 
lucene::store::Directory {
 public:
     static const char* const WRITE_LOCK_FILE;
     static const int64_t MAX_HEADER_DATA_SIZE = 1024 * 128; // 128k
-private:
-    int filemode;
 
 protected:
     mutable std::mutex _this_lock;
@@ -91,6 +90,12 @@ public:
 
     virtual void init(const io::FileSystemSPtr& fs, const char* path,
                       lucene::store::LockFactory* lock_factory = nullptr);
+
+    void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts = 
opts; }
+
+private:
+    int32_t filemode;
+    io::FileWriterOptions _opts;
 };
 
 class CLUCENE_EXPORT DorisRAMFSDirectory : public DorisFSDirectory {
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 36b200fe8e3..f20af3df80a 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -139,6 +139,8 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, 
uint32_t segment_id,
                 _opts.rowset_ctx->rowset_id.to_string(), segment_id,
                 _tablet_schema->get_inverted_index_storage_format(),
                 std::move(inverted_file_writer));
+        _inverted_index_file_writer->set_file_writer_opts(
+                _opts.rowset_ctx->get_file_writer_options());
     }
 }
 
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index aa9376a8d78..3e23b1fda52 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -120,6 +120,8 @@ 
VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32
                 _opts.rowset_ctx->rowset_id.to_string(), segment_id,
                 _tablet_schema->get_inverted_index_storage_format(),
                 std::move(inverted_file_writer));
+        _inverted_index_file_writer->set_file_writer_opts(
+                _opts.rowset_ctx->get_file_writer_options());
     }
 }
 
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp 
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index 1db74843697..ee687d18edc 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -165,14 +165,7 @@ Status VerticalBetaRowsetWriter<T>::_create_segment_writer(
     int seg_id = this->_num_segment.fetch_add(1, std::memory_order_relaxed);
 
     io::FileWriterPtr file_writer;
-    io::FileWriterOptions opts {
-            .write_file_cache = this->_context.write_file_cache,
-            .is_cold_data = this->_context.is_hot_data,
-            .file_cache_expiration = this->_context.file_cache_ttl_sec > 0 &&
-                                                     
this->_context.newest_write_timestamp > 0
-                                             ? 
this->_context.newest_write_timestamp +
-                                                       
this->_context.file_cache_ttl_sec
-                                             : 0};
+    io::FileWriterOptions opts = this->_context.get_file_writer_options();
 
     auto path = context.segment_path(seg_id);
     auto& fs = context.fs_ref();
diff --git 
a/regression-test/suites/fault_injection_p0/test_index_writer_file_cache.groovy 
b/regression-test/suites/fault_injection_p0/test_index_writer_file_cache.groovy
new file mode 100644
index 00000000000..b26794e3671
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_index_writer_file_cache.groovy
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+suite("test_index_writer_file_cache_fault_injection", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return;
+    }
+
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    def testTable1 = "test_index_writer_file_cache_fault_injection_1"
+    def testTable2 = "test_index_writer_file_cache_fault_injection_2"
+
+    sql "DROP TABLE IF EXISTS ${testTable1}"
+    sql """
+        CREATE TABLE ${testTable1} (
+          `@timestamp` int(11) NULL COMMENT "",
+          `clientip` string NULL COMMENT "",
+          `request` string NULL COMMENT "",
+          `status` int(11) NULL COMMENT "",
+          `size` int(11) NULL COMMENT "",
+          INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
+          INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = 
"unicode", "support_phrase" = "true") COMMENT '',
+          INDEX status_idx (`status`) USING INVERTED COMMENT '',
+          INDEX size_idx (`size`) USING INVERTED COMMENT ''
+          ) ENGINE=OLAP
+          DUPLICATE KEY(`@timestamp`)
+          COMMENT "OLAP"
+          DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
+          PROPERTIES (
+          "replication_allocation" = "tag.location.default: 1",
+          "disable_auto_compaction" = "true"
+        );
+    """
+
+    sql "DROP TABLE IF EXISTS ${testTable2}"
+    sql """
+        CREATE TABLE ${testTable2} (
+          `@timestamp` int(11) NULL COMMENT "",
+          `clientip` string NULL COMMENT "",
+          `request` string NULL COMMENT "",
+          `status` int(11) NULL COMMENT "",
+          `size` int(11) NULL COMMENT "",
+          INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
+          INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = 
"unicode", "support_phrase" = "true") COMMENT '',
+          INDEX status_idx (`status`) USING INVERTED COMMENT '',
+          INDEX size_idx (`size`) USING INVERTED COMMENT ''
+          ) ENGINE=OLAP
+          DUPLICATE KEY(`@timestamp`)
+          COMMENT "OLAP"
+          DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
+          PROPERTIES (
+          "replication_allocation" = "tag.location.default: 1",
+          "disable_auto_compaction" = "true"
+        );
+    """
+
+    def insert_and_compaction = { tableName ->
+      sql """ INSERT INTO ${tableName} VALUES (893964617, '40.135.0.0', 'GET 
/images/hm_bg.jpg HTTP/1.0', 200, 24736); """
+      sql """ INSERT INTO ${tableName} VALUES (893964653, '232.0.0.0', 'GET 
/images/hm_bg.jpg HTTP/1.0', 200, 3781); """
+      sql """ INSERT INTO ${tableName} VALUES (893964672, '26.1.0.0', 'GET 
/images/hm_bg.jpg HTTP/1.0', 304, 0); """
+
+      def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+
+      for (def tablet in tablets) {
+        String tablet_id = tablet.TabletId
+        String backend_id = tablet.BackendId
+        def (code, out, err) = 
be_run_full_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+        logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
+        assertEquals(code, 0)
+        def compactJson = parseJson(out.trim())
+        assertEquals("success", compactJson.status.toLowerCase())
+      }
+
+      for (def tablet in tablets) {
+        boolean running = true
+        do {
+            Thread.sleep(1000)
+            String tablet_id = tablet.TabletId
+            String backend_id = tablet.BackendId
+            def (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Get compaction status: code=" + code + ", out=" + out 
+ ", err=" + err)
+            assertEquals(code, 0)
+            def compactionStatus = parseJson(out.trim())
+            assertEquals("success", compactionStatus.status.toLowerCase())
+            running = compactionStatus.run_status
+        } while (running)
+      }
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput::init.file_cache")
+
+        insert_and_compaction.call(testTable1);
+        insert_and_compaction.call(testTable2);
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput::init.file_cache")
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to