This is an automated email from the ASF dual-hosted git repository.

adonisling pushed a commit to branch clucene
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/clucene by this push:
     new 23ae50c  [Improvement](index compaction) Improve index compaction perf 
by priority queue (#59)
23ae50c is described below

commit 23ae50cc7dec739ab37fb44832ab383a3914f5fb
Author: airborne12 <airborn...@gmail.com>
AuthorDate: Tue Apr 25 14:40:07 2023 +0800

    [Improvement](index compaction) Improve index compaction perf by priority 
queue (#59)
    
    This pull request addresses the issue of index compaction performance by 
introducing a priority queue to better manage the process. This update involves 
changes in IndexWriter.cpp, SegmentMergeInfo.cpp, and _SegmentMergeInfo.h. The 
key modifications include:
    
    1. Replacing the previous approach for finding the smallest term and 
constructing a dest_idx_bitmap with a more efficient priority queue based 
approach.
    2. Introducing a new postingQueue class to manage the document merging 
process.
    3. Implementing a new DestDoc struct to store relevant information for each 
document in the merging process.
    4. Refactoring the mergeTerms() method to use the new priority queue based 
approach.
    
    These changes have resulted in improved performance during the index 
compaction process, leading to faster indexing and optimized resource usage.
    
    Co-authored-by: airborne12 <airborn...@gmail.com>
---
 src/core/CLucene/index/IndexWriter.cpp      | 312 ++++++++++++----------------
 src/core/CLucene/index/SegmentMergeInfo.cpp |  11 +-
 src/core/CLucene/index/_SegmentMergeInfo.h  |   3 +-
 3 files changed, 135 insertions(+), 191 deletions(-)

diff --git a/src/core/CLucene/index/IndexWriter.cpp 
b/src/core/CLucene/index/IndexWriter.cpp
index 091b51c..b63f331 100644
--- a/src/core/CLucene/index/IndexWriter.cpp
+++ b/src/core/CLucene/index/IndexWriter.cpp
@@ -36,6 +36,7 @@
 #include "_TermInfo.h"
 #include "vp4.h"
 #include <algorithm>
+#include <memory>
 #include <assert.h>
 #include <iostream>
 
@@ -1251,7 +1252,7 @@ void 
IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d
                                   std::vector<uint32_t> dest_index_docs) {
     CND_CONDITION(src_dirs.size() > 0, "Source directory not found.");
     CND_CONDITION(dest_dirs.size() > 0, "Destination directory not found.");
-    this->_trans_vec = trans_vec;
+    this->_trans_vec = std::move(trans_vec);
 
     // order mapping: dir -> segment info -> segment reader
     addIndexesSegments(src_dirs);
@@ -1542,123 +1543,138 @@ void 
IndexWriter::writeFields(lucene::store::Directory *d, std::string segment)
     fieldInfos->write(d, Misc::segmentname(segment.c_str(), ".fnm").c_str());
 }
 
-void IndexWriter::mergeTerms() {
-    auto numSrcIndexes = readers.size();
-    // get termEnum of all readers
-    std::vector<TermEnum *> termEnumList(numSrcIndexes);
-    // get termPositions of all readers
-    std::vector<TermPositions *> postingsList(numSrcIndexes);
-    for (int i = 0; i < numSrcIndexes; ++i) {
-        auto reader = readers[i];
-        TermEnum *te = reader->terms();
-        // move the pointer to the first term in enum.
-        te->next();
-        termEnumList[i] = te;
-        postingsList[i] = reader->termPositions();
+struct DestDoc {
+    uint32_t srcIdx{};
+    uint32_t destIdx{};
+    uint32_t destDocId{};
+
+    DestDoc() = default;;
+    DestDoc(uint32_t srcIdx, uint32_t destIdx, uint32_t destDocId) : 
srcIdx(srcIdx), destIdx(destIdx), destDocId(destDocId) {}
+};
+
+class postingQueue : public 
CL_NS(util)::PriorityQueue<DestDoc*,CL_NS(util)::Deletor::Object<DestDoc> >{
+public:
+    explicit postingQueue(int32_t count)
+    {
+        initialize(count, false);
+    }
+    ~postingQueue() override{
+        close();
     }
 
-    while (true) {
-        // smallest term
-        Term *smallestTerm = nullptr;
-        Term *currTerm = nullptr;
-        // 1. find the smallest term
-        for (int i = 0; i < numSrcIndexes; ++i) {
-            TermEnum *termEnum = termEnumList[i];
-            currTerm = termEnum->term();
-            // iterate to the enum end
-            if (currTerm == nullptr) {
-                continue;
-            }
-            if (infoStream != NULL) {
-                auto ts = currTerm->toString();
-                message(string("current term: ") + Misc::toString(ts));
-                _CLDELETE(ts);
-            }
-            if (smallestTerm == nullptr) {
-                smallestTerm = currTerm;
-            } else {
-                if (currTerm->compareTo(smallestTerm) < 0) {
-                    _CLDECDELETE(smallestTerm);
-                    smallestTerm = currTerm;
-                } else {
-                    _CLDECDELETE(currTerm);
-                }
-            }
+    void close() {
+        clear();
+    }
+protected:
+    bool lessThan(DestDoc* a, DestDoc* b) override {
+        if (a->destIdx == b->destIdx) {
+            return a->destDocId < b->destDocId;
+        } else {
+            return a->destIdx < b->destIdx;
         }
+    }
 
-        // stop loop when there is no term left
-        if (smallestTerm == nullptr) {
-            break;
-        }
+};
 
-        // smallest term's field can not be blank string, but value can be 
empty.
-        assert(smallestTerm->field() != LUCENE_BLANK_STRING);
+void IndexWriter::mergeTerms() {
+    auto queue = _CLNEW SegmentMergeQueue(readers.size());
+    auto numSrcIndexes = readers.size();
+    //std::vector<TermPositions *> postingsList(numSrcIndexes);
 
-        if (infoStream != NULL) {
-            auto ts = smallestTerm->toString();
-            message(string("smallest term: ") + Misc::toString(ts));
-            _CLDELETE(ts);
+
+    int32_t base = 0;
+    IndexReader *reader = nullptr;
+    SegmentMergeInfo *smi = nullptr;
+
+    for (int i = 0; i < numSrcIndexes; ++i) {
+        reader = readers[i];
+
+        TermEnum *termEnum = reader->terms();
+        smi = _CLNEW SegmentMergeInfo(base, termEnum, reader, i);
+
+        base += reader->numDocs();
+        if (smi->next()) {
+            queue->put(smi);
+        } else {
+            smi->close();
+            _CLDELETE(smi);
         }
+    }
+    auto **match = _CL_NEWARRAY(SegmentMergeInfo *, readers.size());
 
-        // 2. construct dest_idx_bitmap
-        // dest_idx -> docId
-        std::vector<Roaring> dest_idx_bitmap(numDestIndexes);
+    while (queue->size() > 0) {
+        int32_t matchSize = 0;
 
-        for (int i = 0; i < numSrcIndexes; ++i) {
-            // check whether current term is equal to smallest term
-            TermEnum *termEnum = termEnumList[i];
-            Term *cTerm = termEnum->term();
-            if (cTerm == nullptr) {
-                continue;
-            }
-            if (!cTerm->equals(smallestTerm)) {
-                _CLDECDELETE(cTerm);
-                continue;
-            }
-            _CLDECDELETE(cTerm);
+        match[matchSize++] = queue->pop();
+        Term *smallestTerm = match[0]->term;
 
-            // advance termEnum
-            termEnum->next();
+        SegmentMergeInfo *top = queue->top();
+        while (top != nullptr && smallestTerm->equals(top->term)) {
+            match[matchSize++] = queue->pop();
+            top = queue->top();
+        }
 
-            // get an unpositioned TermPositions enumerator.
-            TermPositions *postings = postingsList[i];
-            // Sets this to the data for a term.
-            postings->seek(smallestTerm);
+        std::vector<std::vector<uint32_t>> docDeltaBuffers(numDestIndexes);
+        auto destPostingQueues = _CLNEW postingQueue(matchSize);
+        std::vector<DestDoc> destDocs(matchSize);
+        for (int i = 0; i < matchSize; ++i) {
+            smi = match[i];
+            TermPositions *postings = smi->getPositions();
 
-            while (postings->next()) {
-                // get src doc id number
+            postings->seek(smi->termEnum);
+            if (postings->next()) {
                 int srcDoc = postings->doc();
-                std::pair<int32_t, uint32_t> p = _trans_vec[i][srcDoc];
+                std::pair<int32_t, uint32_t> p = 
_trans_vec[smi->readerIndex][srcDoc];
                 uint32_t destIdx = p.first;
                 uint32_t destDocId = p.second;
 
-                // add to bitmap
-                dest_idx_bitmap[destIdx].add(destDocId);
+                destDocs[i].srcIdx = i;
+                destDocs[i].destIdx = destIdx;
+                destDocs[i].destDocId = destDocId;
+                destPostingQueues->put(&destDocs[i]);
             }
         }
-
-        if (infoStream != NULL) {
-            message(string("bitmap list size: ") + 
Misc::toString((int64_t)dest_idx_bitmap.size()));
+        while (destPostingQueues->size()) {
+            if (destPostingQueues->top() != nullptr) {
+                auto destDoc = destPostingQueues->pop();
+                auto destIdx = destDoc->destIdx;
+                auto destDocId = destDoc->destDocId;
+
+                docDeltaBuffers[destIdx].push_back(destDocId);
+                if (docDeltaBuffers[destIdx].size() == PFOR_BLOCK_SIZE) {
+                    std::vector<uint8_t> compresseddata(4 * 
docDeltaBuffers[destIdx].size() + PFOR_BLOCK_SIZE);
+                    auto size = P4ENC(docDeltaBuffers[destIdx].data(), 
docDeltaBuffers[destIdx].size(), compresseddata.data());
+                    
freqOutputList[destIdx]->writeVInt(docDeltaBuffers[destIdx].size());
+                    freqOutputList[destIdx]->writeVInt(size);
+                    freqOutputList[destIdx]->writeBytes(reinterpret_cast<const 
uint8_t *>(compresseddata.data()), size);
+                    docDeltaBuffers[destIdx].resize(0);
+                }
+                smi = match[destDoc->srcIdx];
+                TermPositions *postings = smi->getPositions();
+                if (postings->next()) {
+                    int srcDoc = postings->doc();
+                    std::pair<int32_t, uint32_t> p = 
_trans_vec[smi->readerIndex][srcDoc];
+                    destDoc->destIdx = p.first;
+                    destDoc->destDocId = p.second;
+                    destPostingQueues->put(destDoc);
+                }
+            }
+        }
+        if (destPostingQueues != NULL) {
+            destPostingQueues->close();
+            _CLDELETE(destPostingQueues);
         }
-
-        // 3. append term data
         for (int i = 0; i < numDestIndexes; ++i) {
             DefaultSkipListWriter *skipListWriter = skipListWriterList[i];
             CL_NS(store)::IndexOutput *freqOutput = freqOutputList[i];
             CL_NS(store)::IndexOutput *proxOutput = proxOutputList[i];
             TermInfosWriter *termInfosWriter = termInfosWriterList[i];
 
-            Roaring destBitmap = dest_idx_bitmap[i];
-
-            int32_t docNums = destBitmap.cardinality();
+            int32_t docNums = docDeltaBuffers[i].size();
             if (docNums <= 0) {
                 continue;
             }
 
-            if (infoStream != NULL) {
-                message(string("the [") + Misc::toString((int32_t)i) + 
string("] bitmap doc number: ") + Misc::toString(docNums));
-            }
-
             // Get the file pointer of the IndexOutput to the Frequency File
             int64_t freqPointer = freqOutput->getFilePointer();
             // Get the file pointer of the IndexOutput to the Prox File
@@ -1667,116 +1683,44 @@ void IndexWriter::mergeTerms() {
                 proxPointer = proxOutput->getFilePointer();
             }
 
-            // append postings
-            int32_t lastDoc = 0;
-            int32_t df = 0;//docNums;// document frequency
-
-            skipListWriter->resetSkip();
-
-            bool storePayloads = 
fieldInfos->fieldInfo(smallestTerm->field())->storePayloads;
-            int32_t lastPayloadLength = -1;// ensures that we write the first 
length
-
-            // go through the bitmap
-            for (auto it = destBitmap.begin(); it != destBitmap.end(); ++it) {
-                // dest doc id
-                int32_t doc = *it;
+            freqOutput->writeVInt(docDeltaBuffers[i].size());
 
-                //Condition check to see doc is eaqual to or bigger than 
lastDoc
-                if (doc < 0 || (df > 0 && doc > 0 && doc <= lastDoc)) {
-                    _CLTHROWA(CL_ERR_CorruptIndex, (string("docs out of order 
(") + Misc::toString(doc) +
-                                                    " <= " + 
Misc::toString(lastDoc) + " )")
-                                                           .c_str());
-                }
-
-                if ((++df % skipInterval) == 0) {
-                    skipListWriter->setSkipData(lastDoc, storePayloads, 
lastPayloadLength);
-                    skipListWriter->bufferSkip(df);
-                }
-
-                //Calculate a new docCode
-                //use low bit to flag freq=1
-                int32_t docCode = (doc - lastDoc) << 1;
-                lastDoc = doc;
-
-                docDeltaBuffer.push_back(doc);
-
-                if (docDeltaBuffer.size() == PFOR_BLOCK_SIZE) {
-                    std::vector<uint8_t> compresseddata(4 * 
docDeltaBuffer.size() + PFOR_BLOCK_SIZE);
-                    auto size = P4ENC(docDeltaBuffer.data(), 
docDeltaBuffer.size(), compresseddata.data());
-                    freqOutput->writeVInt(docDeltaBuffer.size());
-                    freqOutput->writeVInt(size);
-                    freqOutput->writeBytes(reinterpret_cast<const uint8_t 
*>(compresseddata.data()), size);
-                    docDeltaBuffer.resize(0);
-                }
-
-                /// write positions
-                /** See {@link DocumentWriter#writePostings(Posting[], 
String)} for
-                 *  documentation about the encoding of positions and payloads
-                 */
-                // TODO(luen): fix this when write position file
-                if (false) {
-                    //                    int32_t lastPosition = 0;
-                    //                    // write position deltas
-                    //                    for (int32_t k = 0; k < freq; k++) {
-                    //                        //Get the next position
-                    //                        int32_t position = 
postings->nextPosition();
-                    //                        int32_t delta = position - 
lastPosition;
-                    //                        if (storePayloads) {
-                    //                            size_t payloadLength = 
postings->getPayloadLength();
-                    //                            if (payloadLength == 
lastPayloadLength) {
-                    //                                
proxOutput->writeVInt(delta * 2);
-                    //                            } else {
-                    //                                
proxOutput->writeVInt(delta * 2 + 1);
-                    //                                
proxOutput->writeVInt(payloadLength);
-                    //                                lastPayloadLength = 
payloadLength;
-                    //                            }
-                    //                            if (payloadLength > 0) {
-                    //                                if ( 
payloadBuffer.length < payloadLength ){
-                    //                                    
payloadBuffer.resize(payloadLength);
-                    //                                }
-                    //                                
postings->getPayload(payloadBuffer.values);
-                    //                                
proxOutput->writeBytes(payloadBuffer.values, payloadLength);
-                    //                            }
-                    //                        } else {
-                    //                            proxOutput->writeVInt(delta);
-                    //                        }
-                    //                        lastPosition = position;
-                    //                    }
-                }
-            }
-
-            assert(df > 0);
-            // Done merging this term
-            freqOutput->writeVInt(docDeltaBuffer.size());
             uint32_t lDoc = 0;
-            for (auto &docDelta: docDeltaBuffer) {
+            for (auto &docDelta: docDeltaBuffers[i]) {
                 freqOutput->writeVInt(docDelta - lDoc);
                 lDoc = docDelta;
             }
-            docDeltaBuffer.resize(0);
+            docDeltaBuffers[i].resize(0);
             int64_t skipPointer = skipListWriter->writeSkip(freqOutput);
 
             // write terms
             TermInfo termInfo;
-            termInfo.set(df, freqPointer, proxPointer, (int32_t) (skipPointer 
- freqPointer));
+            termInfo.set(docNums, freqPointer, proxPointer, (int32_t) 
(skipPointer - freqPointer));
             // Write a new TermInfo
             termInfosWriter->add(smallestTerm, &termInfo);
         }
-        // decrement term refcount
-        _CLDECDELETE(smallestTerm);
-    }
 
-    // delete term enum list
-    for (auto te: termEnumList) {
-        te->close();
-        _CLDELETE(te);
+        while (matchSize > 0) {
+            smi = match[--matchSize];
+
+            // Move to the next term in the enumeration of SegmentMergeInfo smi
+            if (smi->next()) {
+                // There still are some terms so restore smi in the queue
+                queue->put(smi);
+
+            } else {
+                // Done with a segment
+                // No terms anymore so close this SegmentMergeInfo instance
+                smi->close();
+                _CLDELETE(smi);
+            }
+        }
     }
-    termEnumList.clear();
 
-    // delete term position list
-    for (auto tp: postingsList) {
-        tp->close();
-        _CLDELETE(tp);
+    _CLDELETE_ARRAY(match);
+    if (queue != NULL) {
+        queue->close();
+        _CLDELETE(queue);
     }
 }
 
diff --git a/src/core/CLucene/index/SegmentMergeInfo.cpp 
b/src/core/CLucene/index/SegmentMergeInfo.cpp
index 035cdd2..86d09b8 100644
--- a/src/core/CLucene/index/SegmentMergeInfo.cpp
+++ b/src/core/CLucene/index/SegmentMergeInfo.cpp
@@ -11,12 +11,11 @@
 
 CL_NS_DEF(index)
 
-SegmentMergeInfo::SegmentMergeInfo(const int32_t b, TermEnum* te, IndexReader* 
r):
-    docMap(NULL),
-    termEnum(te),
-    base(b),
-    reader(r)
-{
+SegmentMergeInfo::SegmentMergeInfo(const int32_t b, TermEnum *te, IndexReader 
*r, const int32_t ri) : docMap(NULL),
+                                                                               
                       termEnum(te),
+                                                                               
                       base(b),
+                                                                               
                       reader(r),
+                                                                               
                       readerIndex(ri) {
 //Func - Constructor
 //Pre  - b >= 0
 //       te contains a valid reference to a SegmentTermEnum instance
diff --git a/src/core/CLucene/index/_SegmentMergeInfo.h 
b/src/core/CLucene/index/_SegmentMergeInfo.h
index 87b05cb..87a549c 100644
--- a/src/core/CLucene/index/_SegmentMergeInfo.h
+++ b/src/core/CLucene/index/_SegmentMergeInfo.h
@@ -24,9 +24,10 @@ public:
        Term* term;
        int32_t base;
        IndexReader* reader;
+       int32_t readerIndex;
      
        //Constructor
-       SegmentMergeInfo(const int32_t b, TermEnum* te, IndexReader* r);
+       SegmentMergeInfo(const int32_t b, TermEnum* te, IndexReader* r, int32_t 
readerIndex=0);
 
        //Destructor
        ~SegmentMergeInfo();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to