github-actions[bot] commented on code in PR #63633:
URL: https://github.com/apache/doris/pull/63633#discussion_r3345163012


##########
be/src/storage/index/inverted/spimi/spimi_index_writer.cpp:
##########
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/index/inverted/spimi/spimi_index_writer.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "common/exception.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "storage/index/inverted/inverted_index_common.h"
+#include "storage/index/inverted/inverted_index_fs_directory.h"
+#include "storage/index/inverted/spimi/index_output_byte_output.h"
+#include "storage/index/inverted/spimi/segment_merger.h"
+
+namespace doris::segment_v2::inverted_index::spimi {
+
+// V4 (pure SPIMI) uses plain "_0.*" names; shadow/debug mode uses
+// "_spimi_0.*" to avoid clashing with the CLucene-emitted segment.
+SpimiIndexWriter::FileNames SpimiIndexWriter::GetFileNames(bool is_v4) {
+    if (is_v4) {
+        return {"_0.tis", "_0.tii", "_0.frq",     "_0.prx",
+                "_0.fnm", "_0.nrm", "segments_1", "segments.gen"};
+    }
+    return {"_spimi_0.tis", "_spimi_0.tii", "_spimi_0.frq",     "_spimi_0.prx",
+            "_spimi_0.fnm", "_spimi_0.nrm", "segments_spimi_1", 
"segments_spimi.gen"};
+}
+
+SpimiIndexWriter::OutputStreams 
SpimiIndexWriter::CreateOutputStreams(DorisFSDirectory* dir,
+                                                                      const 
FileNames& names) {
+    OutputStreams s;
+    s.tis.reset(dir->createOutput(names.tis));
+    s.tii.reset(dir->createOutput(names.tii));
+    s.frq.reset(dir->createOutput(names.frq));
+    s.prx.reset(dir->createOutput(names.prx));
+    s.fnm.reset(dir->createOutput(names.fnm));
+    s.nrm.reset(dir->createOutput(names.nrm));
+    s.seg_n.reset(dir->createOutput(names.seg_n));
+    s.seg_gen.reset(dir->createOutput(names.seg_gen));
+    return s;
+}
+
+SpimiIndexWriter::SpimiIndexWriter(std::string field_name, bool is_v4)
+        : _field_name(std::move(field_name)),
+          _buffer(std::make_unique<SpimiPostingBuffer>()),
+          _spill_manager(std::make_unique<SpillManager>(_field_name, is_v4)) {}
+
+void SpimiIndexWriter::AppendToken(std::string_view term, uint32_t doc_id, 
uint32_t position) {
+    DCHECK(_buffer != nullptr);
+    _buffer->Append(term, doc_id, position);
+}
+
+bool SpimiIndexWriter::Saturated() const {
+    DCHECK(_buffer != nullptr);
+    return _buffer->Saturated();
+}
+
+bool SpimiIndexWriter::ShouldFlush() const {
+    DCHECK(_buffer != nullptr);
+    return _buffer->ShouldFlush();
+}
+
+void SpimiIndexWriter::FlushPending(int32_t doc_count) {
+    DCHECK(_buffer != nullptr);
+    _spill_manager->FlushBuffer(*_buffer, doc_count);
+}
+
+int64_t SpimiIndexWriter::MemoryUsage() const {
+    int64_t buf = _buffer ? static_cast<int64_t>(_buffer->MemoryUsage()) : 0;
+    int64_t spill = _spill_manager ? 
static_cast<int64_t>(_spill_manager->TotalSpillBytes()) : 0;
+    return buf + spill;
+}
+
+void SpimiIndexWriter::Cleanup() {
+    if (_spill_manager) {
+        _spill_manager->CleanupSpillFiles();
+    }
+    _buffer.reset();
+}
+
+EmittedSegmentByteCounts SpimiIndexWriter::EmitDirect(const OutputStreams& 
streams,
+                                                      const SpimiFinishConfig& 
config) {
+    IndexOutputByteOutput tis_bo(streams.tis.get());
+    IndexOutputByteOutput tii_bo(streams.tii.get());
+    IndexOutputByteOutput frq_bo(streams.frq.get());
+    IndexOutputByteOutput prx_bo(streams.prx.get());
+    IndexOutputByteOutput fnm_bo(streams.fnm.get());
+    IndexOutputByteOutput nrm_bo(streams.nrm.get());
+    IndexOutputByteOutput seg_n_bo(streams.seg_n.get());
+    IndexOutputByteOutput seg_gen_bo(streams.seg_gen.get());
+
+    SpimiSegmentSink sink;
+    sink.tis = &tis_bo;
+    sink.tii = &tii_bo;
+    sink.frq = &frq_bo;
+    sink.prx = &prx_bo;
+    sink.fnm = &fnm_bo;
+    sink.nrm = &nrm_bo;
+    sink.segments_n = &seg_n_bo;
+    sink.segments_gen = &seg_gen_bo;
+
+    const bool omit_norms = config.is_v4;
+    // V4 segments are written windowed+inline-capable (the durable read-side
+    // gate is index_version >= kIndexVersionV4, derived in fulltext_writer).
+    // This is SAFE now that windowing is adaptive per term: only df >=
+    // skip_interval terms are windowed; the df=1 long tail stays legacy.
+    const int32_t index_version =
+            config.is_v4 ? FieldInfosWriter::kIndexVersionV4 : 
FieldInfosWriter::kIndexVersionV0;
+    EmittedSegmentByteCounts byte_counts;
+    SpimiFulltextWriter::EmitSegment(*_buffer, sink, /*segment_name=*/"_0", 
config.field_name_utf8,
+                                     config.doc_count, index_version,
+                                     config.omit_term_freq_and_positions, 
omit_norms, &byte_counts);
+    return byte_counts;
+}
+
+void SpimiIndexWriter::EmitMerged(const OutputStreams& streams, const 
SpimiFinishConfig& config) {
+    // Flush remaining buffer contents as one more spill segment.
+    if (_buffer->ShouldFlush() || _buffer->RecordCount() > 0) {
+        _spill_manager->FlushBuffer(*_buffer, config.doc_count);
+    }
+
+    IndexOutputByteOutput tis_bo(streams.tis.get());
+    IndexOutputByteOutput tii_bo(streams.tii.get());
+    IndexOutputByteOutput frq_bo(streams.frq.get());
+    IndexOutputByteOutput prx_bo(streams.prx.get());
+    IndexOutputByteOutput fnm_bo(streams.fnm.get());
+    IndexOutputByteOutput nrm_bo(streams.nrm.get());
+    IndexOutputByteOutput seg_n_bo(streams.seg_n.get());
+    IndexOutputByteOutput seg_gen_bo(streams.seg_gen.get());
+
+    SpimiSegmentSink sink;
+    sink.tis = &tis_bo;
+    sink.tii = &tii_bo;
+    sink.frq = &frq_bo;
+    sink.prx = &prx_bo;
+    sink.fnm = &fnm_bo;
+    sink.nrm = &nrm_bo;
+    sink.segments_n = &seg_n_bo;
+    sink.segments_gen = &seg_gen_bo;
+
+    // Stream each spill back from its node-local tmp file into a
+    // SegmentMerger::Input. The bytes are owned by the Input alone (no 
parallel
+    // copy retained in _spills), so resident RAM is the sum of the loaded 
spills
+    // exactly ONCE — not the COPY-double the old copy-assignment loop 
incurred.
+    const size_t spill_count = _spill_manager->SpillCount();
+    std::vector<SegmentMerger::Input> inputs;
+    inputs.reserve(spill_count);
+    for (size_t i = 0; i < spill_count; ++i) {
+        SegmentMerger::Input inp;
+        // LoadSpill moves bytes off disk into `inp`. For the single-spill case
+        // this feeds Merge() exactly one (moved) Input, keeping the
+        // MergeSingleInput byte-copy fast path reachable. IO errors propagate
+        // as doris::Exception through Finish's try/catch + FINALLY_CLOSE.
+        if (Status st = _spill_manager->LoadSpill(i, inp); !st.ok()) {
+            throw doris::Exception(st);
+        }
+        inputs.push_back(std::move(inp));
+    }
+
+    const bool omit_norms = config.is_v4;
+    // V4 merged segments advertise kIndexVersionV4 in .fnm so the read side
+    // and the merge re-encode path turn on windowed+inline. Spill segments are
+    // written with the SAME V4 gate (see SpillManager), so the single-input
+    // byte-copy fast path stays format-consistent. Adaptive per-term windowing
+    // keeps the df=1 tail legacy, so this is safe.
+    const int32_t index_version =
+            config.is_v4 ? FieldInfosWriter::kIndexVersionV4 : 
FieldInfosWriter::kIndexVersionV0;
+    SegmentMerger::Merge(inputs, sink, /*segment_name=*/"_0", 
config.field_name_utf8,
+                         config.doc_count, index_version, 
config.omit_term_freq_and_positions,
+                         omit_norms);
+}
+
+void SpimiIndexWriter::Finish(DorisFSDirectory* dir, const SpimiFinishConfig& 
config) {
+    // Nothing to emit if the buffer was never created or already cleaned up.
+    if (!HasBuffer()) {
+        return;
+    }
+    // If the buffer is saturated with zero terms we still need to emit
+    // an empty segment so the reader can open it. But if it's truly
+    // empty and not saturated there's no data at all.
+    if (_buffer->RecordCount() == 0 && !_buffer->Saturated() && 
_spill_manager->SpillCount() == 0) {

Review Comment:
   This early return leaves V4 segments with rows but no postings without any 
`_0.*` segment files. That can happen through the new V4 paths when a segment 
contains only NULLs (`add_nulls()` updates `_spimi_doc_count` but appends no 
records), only empty analyzed strings, or only tokens removed by the 
analyzer/stop-word filter. `InvertedIndexColumnWriter::finish()` still writes 
the null bitmap and returns success, but later `SpimiSearcherBuilder::build()` 
unconditionally reads `_0.tis`, `_0.tii`, `_0.fnm`, `segments_1`, and `_0.frq`, 
so a normal `MATCH` query against that tablet errors as a corrupted/missing 
index instead of returning an empty result set. V1/V2/V3 avoid this by adding 
null/empty documents to CLucene and leaving a readable segment. Please emit a 
readable empty SPIMI segment whenever `config.doc_count > 0` (and make the 
reader tolerate its empty postings), or otherwise make the V4 query path handle 
this no-postings segment explicitly. Add an end-to-end V4 test for null-
 only/empty-only or stop-word-only data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to