This is an automated email from the ASF dual-hosted git repository.

airborne pushed a commit to branch clucene
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/clucene by this push:
     new 98826578c4e Optimize the compression of inverted index position 
information (#242)
98826578c4e is described below

commit 98826578c4eff5ca2c1bbde7be06d9a84c7e5cfa
Author: zzzxl <33418555+zzzxl1...@users.noreply.github.com>
AuthorDate: Thu Oct 17 22:10:48 2024 +0800

    Optimize the compression of inverted index position information (#242)
---
 src/core/CLucene/index/FieldInfos.cpp           |  18 +-
 src/core/CLucene/index/IndexVersion.h           |   1 +
 src/core/CLucene/index/IndexWriter.cpp          |  34 ++-
 src/core/CLucene/index/IndexWriter.h            |   5 +-
 src/core/CLucene/index/SDocumentWriter.cpp      |  12 +-
 src/core/CLucene/index/SDocumentWriter.h        |   1 +
 src/core/CLucene/index/SegmentTermDocs.cpp      |   4 +-
 src/core/CLucene/index/SegmentTermPositions.cpp |  16 +-
 src/core/CLucene/index/_FieldInfos.h            |   9 +-
 src/core/CLucene/index/_SegmentHeader.h         |  61 +++++
 src/core/CLucene/util/PFORUtil.cpp              |  53 ++++
 src/core/CLucene/util/PFORUtil.h                |  13 +
 src/test/CMakeLists.txt                         |   1 +
 src/test/index/TestIndexCompress.cpp            | 309 ++++++++++++++++++++++++
 src/test/test.h                                 |   1 +
 src/test/tests.cpp                              |   1 +
 16 files changed, 514 insertions(+), 25 deletions(-)

diff --git a/src/core/CLucene/index/FieldInfos.cpp 
b/src/core/CLucene/index/FieldInfos.cpp
index 00e0c4275a5..3fa035661d4 100644
--- a/src/core/CLucene/index/FieldInfos.cpp
+++ b/src/core/CLucene/index/FieldInfos.cpp
@@ -125,11 +125,12 @@ void FieldInfos::addIndexed(const TCHAR** names, const 
bool storeTermVectors, co
 
 void FieldInfos::add(const TCHAR** names, const bool isIndexed, const bool 
storeTermVectors,
                      const bool storePositionWithTermVector, const bool 
storeOffsetWithTermVector,
-                     const bool omitNorms, const bool hasProx, const bool 
storePayloads) {
+                     const bool omitNorms, const bool hasProx, const bool 
storePayloads,
+                                                                               
 IndexVersion indexVersion) {
         size_t i = 0;      
        while ( names[i] != NULL ){
                add(names[i], isIndexed, storeTermVectors, 
storePositionWithTermVector, 
-                       storeOffsetWithTermVector, omitNorms, hasProx, 
storePayloads);
+                       storeOffsetWithTermVector, omitNorms, hasProx, 
storePayloads, indexVersion);
                ++i;
        }
 }
@@ -137,11 +138,13 @@ void FieldInfos::add(const TCHAR** names, const bool 
isIndexed, const bool store
 FieldInfo* FieldInfos::add(const TCHAR* name, const bool isIndexed, const bool 
storeTermVector,
                            const bool storePositionWithTermVector,
                            const bool storeOffsetWithTermVector, const bool 
omitNorms,
-                           const bool hasProx, const bool storePayloads) {
+                           const bool hasProx, const bool storePayloads,
+                                                                               
                         IndexVersion indexVersion) {
   FieldInfo* fi = fieldInfo(name);
        if (fi == NULL) {
                return addInternal(name, isIndexed, storeTermVector, 
storePositionWithTermVector,
-                                                                               
                storeOffsetWithTermVector, omitNorms, hasProx, storePayloads);
+                                                                               
                storeOffsetWithTermVector, omitNorms, hasProx, storePayloads,
+                                                                               
                indexVersion);
   } else {
                if (fi->isIndexed != isIndexed) {
                        fi->isIndexed = true;                      // once 
indexed, always index
@@ -164,6 +167,9 @@ FieldInfo* FieldInfos::add(const TCHAR* name, const bool 
isIndexed, const bool s
                if (fi->storePayloads != storePayloads) {
                        fi->storePayloads = true;
                }
+               if (fi->indexVersion_ != indexVersion) {
+                       fi->indexVersion_ = indexVersion;
+               }
        }
        return fi;
 }
@@ -172,10 +178,12 @@ FieldInfo* FieldInfos::addInternal(const TCHAR* name, 
const bool isIndexed,
                                    const bool storeTermVector,
                                    const bool storePositionWithTermVector,
                                    const bool storeOffsetWithTermVector, const 
bool omitNorms,
-                                   const bool hasProx, const bool 
storePayloads) {
+                                   const bool hasProx, const bool 
storePayloads,
+                                                                               
                                                         IndexVersion 
indexVersion) {
        FieldInfo* fi = _CLNEW FieldInfo(name, isIndexed, byNumber.size(), 
storeTermVector,
                                                                                
                                                                
storePositionWithTermVector, storeOffsetWithTermVector,
                                                                                
                                                                omitNorms, 
hasProx, storePayloads);
+       fi->setIndexVersion(indexVersion);
   byNumber.push_back(fi);
        byName.put( fi->name, fi);
        return fi;
diff --git a/src/core/CLucene/index/IndexVersion.h 
b/src/core/CLucene/index/IndexVersion.h
index 448b1872ab4..1320df2c4c7 100644
--- a/src/core/CLucene/index/IndexVersion.h
+++ b/src/core/CLucene/index/IndexVersion.h
@@ -3,6 +3,7 @@
 enum class IndexVersion {
   kV0 = 0,
   kV1 = 1,
+  kV2 = 2,
 
   kNone
 };
\ No newline at end of file
diff --git a/src/core/CLucene/index/IndexWriter.cpp 
b/src/core/CLucene/index/IndexWriter.cpp
index 10dfd68c60d..8a2c50431cc 100644
--- a/src/core/CLucene/index/IndexWriter.cpp
+++ b/src/core/CLucene/index/IndexWriter.cpp
@@ -8,6 +8,7 @@
 
 #include "CLucene/analysis/AnalysisHeader.h"
 #include "CLucene/analysis/Analyzers.h"
+#include "CLucene/config/repl_wchar.h"
 #include "CLucene/document/Document.h"
 #include "CLucene/search/Similarity.h"
 #include "CLucene/store/Directory.h"
@@ -1327,22 +1328,27 @@ void 
IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d
     std::vector<lucene::index::IndexWriter *> destIndexWriterList;
     std::vector<lucene::store::IndexOutput *> nullBitmapIndexOutputList;
     try {
-        // check hasProx
+        // check hasProx, indexVersion
         bool hasProx = false;
+        IndexVersion indexVersion = IndexVersion::kV1;
         {
             if (!readers.empty()) {
                 IndexReader* reader = readers[0];
                 hasProx = reader->getFieldInfos()->hasProx();
+                indexVersion = reader->getFieldInfos()->getIndexVersion();
                 for (int32_t i = 1; i < readers.size(); i++) {
                     if (hasProx != readers[i]->getFieldInfos()->hasProx()) {
                         _CLTHROWA(CL_ERR_IllegalArgument, "src_dirs hasProx 
inconformity");
                     }
+                    if (indexVersion != 
readers[i]->getFieldInfos()->getIndexVersion()) {
+                        _CLTHROWA(CL_ERR_IllegalArgument, "src_dirs 
indexVersion inconformity");
+                    }
                 }
             }
         }
 
         /// merge fields
-        mergeFields(hasProx);
+        mergeFields(hasProx, indexVersion);
 
         /// write fields and create files writers
         for (int j = 0; j < numDestIndexes; j++) {
@@ -1390,7 +1396,7 @@ void 
IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d
         }
 
         /// merge terms
-        mergeTerms(hasProx);
+        mergeTerms(hasProx, indexVersion);
 
         /// merge null_bitmap
         mergeNullBitmap(srcNullBitmapValues, nullBitmapIndexOutputList);
@@ -1555,7 +1561,7 @@ void IndexWriter::compareIndexes(lucene::store::Directory 
*other) {
     }
 }
 
-void IndexWriter::mergeFields(bool hasProx) {
+void IndexWriter::mergeFields(bool hasProx, IndexVersion indexVersion) {
     //Create a new FieldInfos
     fieldInfos = _CLNEW FieldInfos();
     //Condition check to see if fieldInfos points to a valid instance
@@ -1570,7 +1576,8 @@ void IndexWriter::mergeFields(bool hasProx) {
         FieldInfo *fi = reader->getFieldInfos()->fieldInfo(j);
         fieldInfos->add(fi->name, fi->isIndexed, fi->storeTermVector,
                         fi->storePositionWithTermVector, 
fi->storeOffsetWithTermVector,
-                        !reader->hasNorms(fi->name), hasProx, 
fi->storePayloads);
+                        !reader->hasNorms(fi->name), hasProx, 
fi->storePayloads,
+                        fi->indexVersion_);
     }
 }
 
@@ -1614,7 +1621,7 @@ protected:
 
 };
 
-void IndexWriter::mergeTerms(bool hasProx) {
+void IndexWriter::mergeTerms(bool hasProx, IndexVersion indexVersion) {
     auto queue = _CLNEW SegmentMergeQueue(readers.size());
     auto numSrcIndexes = readers.size();
     //std::vector<TermPositions *> postingsList(numSrcIndexes);
@@ -1667,6 +1674,7 @@ void IndexWriter::mergeTerms(bool hasProx) {
 
         std::vector<std::vector<uint32_t>> docDeltaBuffers(numDestIndexes);
         std::vector<std::vector<uint32_t>> freqBuffers(numDestIndexes);
+        std::vector<std::vector<uint32_t>> posBuffers(numDestIndexes);
         auto destPostingQueues = _CLNEW postingQueue(matchSize);
         std::vector<DestDoc> destDocs(matchSize);
 
@@ -1758,6 +1766,7 @@ void IndexWriter::mergeTerms(bool hasProx) {
                 auto proxOut = proxOutputList[destIdx];
                 auto& docDeltaBuffer = docDeltaBuffers[destIdx];
                 auto& freqBuffer = freqBuffers[destIdx];
+                auto& posBuffer = posBuffers[destIdx];
                 auto skipWriter = skipListWriterList[destIdx];
                 auto& df = dfs[destIdx];
                 auto& lastDoc = lastDocs[destIdx];
@@ -1776,6 +1785,9 @@ void IndexWriter::mergeTerms(bool hasProx) {
                     encode(freqOut, docDeltaBuffer, true);
                     if (hasProx) {
                         encode(freqOut, freqBuffer, false);
+                        if (indexVersion >= IndexVersion::kV2) {
+                            PforUtil::encodePos(proxOut, posBuffer);
+                        }
                     }
 
                     skipWriter->setSkipData(lastDoc, false, -1);
@@ -1791,7 +1803,11 @@ void IndexWriter::mergeTerms(bool hasProx) {
                     for (int32_t i = 0; i < descPositions.size(); i++) {
                         int32_t position = descPositions[i];
                         int32_t delta = position - lastPosition;
-                        proxOut->writeVInt(delta);
+                        if (indexVersion >= IndexVersion::kV2) {
+                            posBuffer.push_back(delta);
+                        } else {
+                            proxOut->writeVInt(delta);
+                        }
                         lastPosition = position;
                     }
                     freqBuffer.push_back(destFreq);
@@ -1828,6 +1844,7 @@ void IndexWriter::mergeTerms(bool hasProx) {
             {
                 auto& docDeltaBuffer = docDeltaBuffers[i];
                 auto& freqBuffer = freqBuffers[i];
+                auto& posBuffer = posBuffers[i];
 
                 freqOutput->writeByte((char)CodeMode::kDefault);
                 freqOutput->writeVInt(docDeltaBuffer.size());
@@ -1851,6 +1868,9 @@ void IndexWriter::mergeTerms(bool hasProx) {
                 }
                 docDeltaBuffer.resize(0);
                 freqBuffer.resize(0);
+                if (indexVersion >= IndexVersion::kV2) {
+                    PforUtil::encodePos(proxOutput, posBuffer);
+                }
             }
             
             int64_t skipPointer = skipListWriter->writeSkip(freqOutput);
diff --git a/src/core/CLucene/index/IndexWriter.h 
b/src/core/CLucene/index/IndexWriter.h
index 7cfb67d2ca7..4f32ede24e6 100644
--- a/src/core/CLucene/index/IndexWriter.h
+++ b/src/core/CLucene/index/IndexWriter.h
@@ -7,6 +7,7 @@
 #ifndef _lucene_index_IndexWriter_
 #define _lucene_index_IndexWriter_
 
+#include "CLucene/index/IndexVersion.h"
 #include "CLucene/util/VoidList.h"
 #include "CLucene/util/Array.h"
 
@@ -320,11 +321,11 @@ public:
                             std::vector<uint32_t> dest_index_docs);
 
     // create new fields info
-    void mergeFields(bool hasProx);
+    void mergeFields(bool hasProx, IndexVersion indexVersion);
     // write fields info file
     void writeFields(lucene::store::Directory* d, std::string segment);
     // merge terms and write files
-    void mergeTerms(bool hasProx);
+    void mergeTerms(bool hasProx, IndexVersion indexVersion);
     // merge null_bitmap
     void mergeNullBitmap(std::vector<std::vector<uint32_t>> srcBitmapValues, 
std::vector<lucene::store::IndexOutput *> nullBitmapIndexOutputList);
 
diff --git a/src/core/CLucene/index/SDocumentWriter.cpp 
b/src/core/CLucene/index/SDocumentWriter.cpp
index 4ff262f86d0..d2b80d89d76 100644
--- a/src/core/CLucene/index/SDocumentWriter.cpp
+++ b/src/core/CLucene/index/SDocumentWriter.cpp
@@ -1222,6 +1222,9 @@ void 
SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
                 encode(freqOut, docDeltaBuffer, true);
                 if (hasProx_) {
                     encode(freqOut, freqBuffer, false);
+                    if (indexVersion_ >= IndexVersion::kV2) {
+                        PforUtil::encodePos(proxOut, posBuffer);
+                    }
                 }
 
                 skipListWriter->setSkipData(lastDoc, 
currentFieldStorePayloads, lastPayloadLength);
@@ -1253,7 +1256,11 @@ void 
SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
                 for (int32_t j = 0; j < termDocFreq; j++) {
                     const int32_t code = prox.readVInt();
                     assert(0 == (code & 1));
-                    proxOut->writeVInt(code >> 1);
+                    if (indexVersion_ >= IndexVersion::kV2) {
+                        posBuffer.push_back(code >> 1);
+                    } else {
+                        proxOut->writeVInt(code >> 1);
+                    }
                 }
                 freqBuffer.push_back(termDocFreq);
             }
@@ -1310,6 +1317,9 @@ void 
SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
             }
             docDeltaBuffer.resize(0);
             freqBuffer.resize(0);
+            if (indexVersion_ >= IndexVersion::kV2) {
+                PforUtil::encodePos(proxOut, posBuffer);
+            }
         }
         
         int64_t skipPointer = skipListWriter->writeSkip(freqOut);
diff --git a/src/core/CLucene/index/SDocumentWriter.h 
b/src/core/CLucene/index/SDocumentWriter.h
index 3dd98186635..36a43fc57b3 100644
--- a/src/core/CLucene/index/SDocumentWriter.h
+++ b/src/core/CLucene/index/SDocumentWriter.h
@@ -52,6 +52,7 @@ private:
     std::string segment;// Current segment we are working on
     std::vector<uint32_t> docDeltaBuffer;
     std::vector<uint32_t> freqBuffer;
+    std::vector<uint32_t> posBuffer;
     std::ostream* infoStream{};
     int64_t ramBufferSize;
     // Flush @ this number of docs.  If rarmBufferSize is
diff --git a/src/core/CLucene/index/SegmentTermDocs.cpp 
b/src/core/CLucene/index/SegmentTermDocs.cpp
index e346dc0ca24..a761fec2810 100644
--- a/src/core/CLucene/index/SegmentTermDocs.cpp
+++ b/src/core/CLucene/index/SegmentTermDocs.cpp
@@ -190,7 +190,7 @@ void TermDocsBuffer::refill() {
     cur_doc_ = 0;
     cur_freq_ = 0;
 
-    if (indexVersion_ == IndexVersion::kV1) {
+    if (indexVersion_ >= IndexVersion::kV1) {
         size_ = refillV1();
     } else {
         size_ = refillV0();
@@ -199,7 +199,7 @@ void TermDocsBuffer::refill() {
 
 void TermDocsBuffer::readRange(DocRange* docRange) {
     int32_t size = 0;
-    if (indexVersion_ == IndexVersion::kV1) {
+    if (indexVersion_ >= IndexVersion::kV1) {
         size = refillV1();
     } else {
         size = refillV0();
diff --git a/src/core/CLucene/index/SegmentTermPositions.cpp 
b/src/core/CLucene/index/SegmentTermPositions.cpp
index 1c7db0703c7..5de0da20add 100644
--- a/src/core/CLucene/index/SegmentTermPositions.cpp
+++ b/src/core/CLucene/index/SegmentTermPositions.cpp
@@ -17,6 +17,8 @@ CL_NS_DEF(index)
 SegmentTermPositions::SegmentTermPositions(const SegmentReader* _parent):
        SegmentTermDocs(_parent), proxStream(NULL)// the proxStream will be 
cloned lazily when nextPosition() is called for the first time
        ,lazySkipPointer(-1), lazySkipProxCount(0)
+    , indexVersion_(_parent->_fieldInfos->getIndexVersion())
+    , buffer_(proxStream, indexVersion_)
 {
     CND_CONDITION(_parent != NULL, "Parent is NULL");
 }
@@ -64,14 +66,15 @@ int32_t SegmentTermPositions::nextPosition() {
 }
 
 int32_t SegmentTermPositions::readDeltaPosition() {
-       int32_t delta = proxStream->readVInt();
+       int32_t delta = buffer_.getPos();
        if (currentFieldStoresPayloads) {
                // if the current field stores payloads then
                // the position delta is shifted one bit to the left.
                // if the LSB is set, then we have to read the current
                // payload length
                if ((delta & 1) != 0) {
-                       payloadLength = proxStream->readVInt();
+                       // payloadLength = proxStream->readVInt();
+            _CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload 
flow is not supported at the moment");
                } 
                delta = (int32_t)((uint32_t)delta >> (uint32_t)1);
                needToLoadPayload = true;
@@ -122,7 +125,8 @@ void SegmentTermPositions::skipPositions(const int32_t n) {
 
 void SegmentTermPositions::skipPayload() {
        if (needToLoadPayload && payloadLength > 0) {
-               proxStream->seek(proxStream->getFilePointer() + payloadLength);
+               // proxStream->seek(proxStream->getFilePointer() + 
payloadLength);
+        _CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload flow is 
not supported at the moment");
        }
        needToLoadPayload = false;
 }
@@ -131,6 +135,7 @@ void SegmentTermPositions::lazySkip() {
     if (proxStream == NULL) {
       // clone lazily
       proxStream = parent->proxStream->clone();
+      buffer_.reset(proxStream);
     }
     
     // we might have to skip the current payload
@@ -138,7 +143,7 @@ void SegmentTermPositions::lazySkip() {
     skipPayload();
       
     if (lazySkipPointer != -1) {
-      proxStream->seek(lazySkipPointer);
+      buffer_.seek(lazySkipPointer);
       lazySkipPointer = -1;
     }
      
@@ -166,7 +171,8 @@ uint8_t* SegmentTermPositions::getPayload(uint8_t* data) {
        } else {
                retArray = data;
        }
-       proxStream->readBytes(retArray, payloadLength);
+       // proxStream->readBytes(retArray, payloadLength);
+    _CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload flow is not 
supported at the moment");
        needToLoadPayload = false;
        return retArray;
 }
diff --git a/src/core/CLucene/index/_FieldInfos.h 
b/src/core/CLucene/index/_FieldInfos.h
index ed142c4435c..4ddd0f47fa6 100644
--- a/src/core/CLucene/index/_FieldInfos.h
+++ b/src/core/CLucene/index/_FieldInfos.h
@@ -146,7 +146,8 @@ public:
        void add(const TCHAR** names, const bool isIndexed, const bool 
storeTermVector = false,
                                                const bool 
storePositionWithTermVector = false,
                                                const bool 
storeOffsetWithTermVector = false, const bool omitNorms = false,
-                                               const bool hasProx = false, 
const bool storePayloads = false);
+                                               const bool hasProx = false, 
const bool storePayloads = false,
+                                               IndexVersion indexVersion = 
IndexVersion::kV1);
 
         // Merges in information from another FieldInfos. 
        void add(FieldInfos* other);
@@ -167,13 +168,15 @@ public:
        FieldInfo* add(const TCHAR* name, const bool isIndexed, const bool 
storeTermVector = false,
                                                                        const 
bool storePositionWithTermVector = false,
                                                                        const 
bool storeOffsetWithTermVector = false, const bool omitNorms = false,
-                                                                       const 
bool hasProx = false, const bool storePayloads = false);
+                                                                       const 
bool hasProx = false, const bool storePayloads = false,
+                                                                       
IndexVersion indexVersion = IndexVersion::kV1);
 
   // was void
        FieldInfo* addInternal(const TCHAR* name, const bool isIndexed, const 
bool storeTermVector,
                                                                                
                        const bool storePositionWithTermVector,
                                                                                
                        const bool storeOffsetWithTermVector, const bool 
omitNorms,
-                                                                               
                        const bool hasProx, const bool storePayloads);
+                                                                               
                        const bool hasProx, const bool storePayloads,
+                                                                               
                        IndexVersion indexVersion = IndexVersion::kV1);
 
        int32_t fieldNumber(const TCHAR* fieldName)const;
        
diff --git a/src/core/CLucene/index/_SegmentHeader.h 
b/src/core/CLucene/index/_SegmentHeader.h
index 6bf7d1819b7..4fa9b3fc04c 100644
--- a/src/core/CLucene/index/_SegmentHeader.h
+++ b/src/core/CLucene/index/_SegmentHeader.h
@@ -7,6 +7,7 @@
 #ifndef _lucene_index_SegmentHeader_
 #define _lucene_index_SegmentHeader_
 
+#include "CLucene/util/PFORUtil.h"
 #include "_SegmentInfos.h"
 #include "CLucene/util/BitSet.h"
 //#include "CLucene/util/VoidMap.h"
@@ -86,6 +87,62 @@ private:
   IndexVersion indexVersion_ = IndexVersion::kV0; 
 };
 
+class TermPostingsBuffer {
+public:
+  TermPostingsBuffer(CL_NS(store)::IndexInput* proxStream, IndexVersion 
indexVersion)
+    : poss_(PforUtil::blockSize + 3)
+    , proxStream_(proxStream)
+    , indexVersion_(indexVersion) {
+  }
+
+  ~TermPostingsBuffer() {
+    size_ = 0;
+    cur_pos_ = 0;
+    proxStream_ = nullptr;
+  }
+
+  int32_t getPos() {
+    if (indexVersion_ >= IndexVersion::kV2) {
+      if (cur_pos_ >= size_) {
+          refill();
+      }
+      return poss_[cur_pos_++];
+    } else {
+      return proxStream_->readVInt();
+    }
+  }
+
+  void seek(int64_t skipPointer) {
+    if (indexVersion_ >= IndexVersion::kV2) {
+      size_ = 0;
+      cur_pos_ = 0;
+    }
+    proxStream_->seek(skipPointer);
+  }
+
+  void reset(CL_NS(store)::IndexInput* proxStream) {
+    size_ = 0;
+    cur_pos_ = 0;
+    proxStream_ = proxStream;
+  }
+
+private:
+  void refill() { 
+    cur_pos_ = 0;
+    size_ = PforUtil::decodePos(proxStream_, poss_);
+  }
+
+private:
+  uint32_t size_ = 0;
+
+  uint32_t cur_pos_ = 0;
+  std::vector<uint32_t> poss_;
+
+  CL_NS(store)::IndexInput* proxStream_ = nullptr;
+
+  IndexVersion indexVersion_ = IndexVersion::kV1;
+};
+
 class SegmentTermDocs:public virtual TermDocs {
 protected:
   const SegmentReader* parent;
@@ -232,6 +289,10 @@ private:
   int32_t doc() const{ return SegmentTermDocs::doc(); }
   int32_t freq() const{ return SegmentTermDocs::freq(); }
   bool skipTo(const int32_t target){ return SegmentTermDocs::skipTo(target); }
+
+private:
+  IndexVersion indexVersion_ = IndexVersion::kV0; 
+  TermPostingsBuffer buffer_;
 };
 
 
diff --git a/src/core/CLucene/util/PFORUtil.cpp 
b/src/core/CLucene/util/PFORUtil.cpp
index ae27f521553..8ebad306a4e 100644
--- a/src/core/CLucene/util/PFORUtil.cpp
+++ b/src/core/CLucene/util/PFORUtil.cpp
@@ -20,6 +20,11 @@
 #include <cpuid.h>
 #endif
 
+#include "CLucene/CLConfig.h"
+#include "CLucene/index/CodeMode.h"
+
+CL_NS_USE(index)
+
 namespace {
 using DEC_FUNC = size_t (*)(unsigned char *__restrict, size_t, uint32_t 
*__restrict);
 using ENC_FUNC = size_t (*)(uint32_t *__restrict in, size_t n, unsigned char 
*__restrict out);
@@ -129,3 +134,51 @@ size_t P4ENC(uint32_t *__restrict in, size_t n, unsigned 
char *__restrict out) {
 size_t P4NZENC(uint32_t *__restrict in, size_t n, unsigned char *__restrict 
out) {
     return g_p4nzenc(in, n, out);
 }
+
+void PforUtil::encodePos(IndexOutput* out, std::vector<uint32_t>& buffer) {
+    auto encode = [&out, &buffer](size_t offset, size_t size, CodeMode mode) {
+        out->writeByte((char)mode);
+        out->writeVInt(size);
+        if (mode == CodeMode::kPfor) {
+            std::vector<uint8_t> compress(4 * size + PFOR_BLOCK_SIZE);
+            size_t compressSize = P4NZENC(buffer.data() + offset, size, 
compress.data());
+            out->writeVInt(compressSize);
+            out->writeBytes(reinterpret_cast<const uint8_t*>(compress.data()), 
compressSize);
+        } else if (mode == CodeMode::kDefault) {
+            for (size_t i = 0; i < size; i++) {
+                out->writeVInt(buffer[offset + i]);
+            }
+        }
+    };
+
+    size_t i = 0;
+    size_t totalSize = buffer.size();
+    while (i < totalSize) {
+        size_t remainingElements = totalSize - i;
+        if (remainingElements >= blockSize) {
+            encode(i, blockSize, CodeMode::kPfor);
+            i += blockSize;
+        } else {
+            encode(i, remainingElements, CodeMode::kDefault);
+            break;
+        }
+    }
+
+    buffer.resize(0);
+}
+
+uint32_t PforUtil::decodePos(IndexInput* in, std::vector<uint32_t>& buffer) {
+    CodeMode mode = static_cast<CodeMode>(in->readByte());
+    uint32_t size = in->readVInt();
+    if (mode == CodeMode::kPfor) {
+        uint32_t serializedSize = in->readVInt();
+        std::vector<uint8_t> buf(serializedSize + PFOR_BLOCK_SIZE);
+        in->readBytes(buf.data(), serializedSize);
+        P4NZDEC(buf.data(), size, buffer.data());
+    } else {
+        for (uint32_t i = 0; i < size; i++) {
+            buffer[i] = in->readVInt();
+        }
+    }
+    return size;
+}
\ No newline at end of file
diff --git a/src/core/CLucene/util/PFORUtil.h b/src/core/CLucene/util/PFORUtil.h
index 29acb7fe7a6..583f5c9a25e 100644
--- a/src/core/CLucene/util/PFORUtil.h
+++ b/src/core/CLucene/util/PFORUtil.h
@@ -18,9 +18,22 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <vector>
+
+#include "CLucene.h"
+#include "CLucene/store/IndexOutput.h"
+
+CL_NS_USE(store)
 
 size_t P4DEC(unsigned char *__restrict in, size_t n, uint32_t *__restrict out);
 size_t P4NZDEC(unsigned char *__restrict in, size_t n, uint32_t *__restrict 
out);
 size_t P4ENC(uint32_t *__restrict in, size_t n, unsigned char *__restrict out);
 size_t P4NZENC(uint32_t *__restrict in, size_t n, unsigned char *__restrict 
out);
 
+class PforUtil {
+public:
+    static constexpr size_t blockSize = 128;
+
+    static void encodePos(IndexOutput* out, std::vector<uint32_t>& buffer);
+    static uint32_t decodePos(IndexInput* out, std::vector<uint32_t>& buffer);
+};
\ No newline at end of file
diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt
index fa8e4d3db03..f48b0ff76eb 100644
--- a/src/test/CMakeLists.txt
+++ b/src/test/CMakeLists.txt
@@ -86,6 +86,7 @@ SET(test_files ./tests.cpp
         ./search/spans/TestSpanExplanationsOfNonMatches.cpp
         ./search/spans/TestSpanExplanationsOfNonMatches.h
         ./index/TestIndexCompaction.cpp
+        ./index/TestIndexCompress.cpp
         ./index/TestIndexModifier.cpp
         ./index/TestIndexWriter.cpp
         ./index/TestIndexModifier.cpp
diff --git a/src/test/index/TestIndexCompress.cpp 
b/src/test/index/TestIndexCompress.cpp
new file mode 100644
index 00000000000..3105c4649c6
--- /dev/null
+++ b/src/test/index/TestIndexCompress.cpp
@@ -0,0 +1,309 @@
+#include <CLucene.h> // IWYU pragma: keep
+#include <CLucene/index/IndexReader.h>
+#include <CLucene/search/query/TermPositionIterator.h>
+#include <CLucene/util/stringUtil.h>
+
+#include <ctime>
+#include <exception>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "CLucene/analysis/Analyzers.h"
+#include "CLucene/index/IndexVersion.h"
+#include "CLucene/index/Term.h"
+#include "CLucene/store/FSDirectory.h"
+#include "test.h"
+
+CL_NS_USE(search)
+CL_NS_USE(store)
+CL_NS_USE(index)
+CL_NS_USE(util)
+
+static constexpr int32_t doc_count = 10000;
+
+#define FINALLY(eptr, finallyBlock)       \
+    {                                     \
+        finallyBlock;                     \
+        if (eptr) {                       \
+            std::rethrow_exception(eptr); \
+        }                                 \
+    }
+
+int32_t getDaySeed() {
+    std::time_t now = std::time(nullptr);
+    std::tm* localTime = std::localtime(&now);
+    localTime->tm_sec = 0;
+    localTime->tm_min = 0;
+    localTime->tm_hour = 0;
+    return static_cast<int32_t>(std::mktime(localTime) / (60 * 60 * 24));
+}
+
+static std::string generateRandomIP() {
+    std::string ip_v4;
+    ip_v4.append(std::to_string(rand() % 256));
+    ip_v4.append(".");
+    ip_v4.append(std::to_string(rand() % 256));
+    ip_v4.append(".");
+    ip_v4.append(std::to_string(rand() % 256));
+    ip_v4.append(".");
+    ip_v4.append(std::to_string(rand() % 256));
+    return ip_v4;
+}
+
+static void write_index(const std::string& name, RAMDirectory* dir, 
IndexVersion index_version,
+                        const std::vector<std::string>& datas) {
+    auto* analyzer = _CLNEW lucene::analysis::SimpleAnalyzer<char>;
+    analyzer->set_stopwords(nullptr);
+    auto* indexwriter = _CLNEW lucene::index::IndexWriter(dir, analyzer, true);
+    indexwriter->setRAMBufferSizeMB(512);
+    indexwriter->setMaxBufferedDocs(-1);
+    indexwriter->setMaxFieldLength(0x7FFFFFFFL);
+    indexwriter->setMergeFactor(1000000000);
+    indexwriter->setUseCompoundFile(false);
+
+    auto* char_string_reader = _CLNEW lucene::util::SStringReader<char>;
+
+    auto* doc = _CLNEW lucene::document::Document();
+    int32_t field_config = lucene::document::Field::STORE_NO;
+    field_config |= lucene::document::Field::INDEX_NONORMS;
+    field_config |= lucene::document::Field::INDEX_TOKENIZED;
+    auto field_name = std::wstring(name.begin(), name.end());
+    auto* field = _CLNEW lucene::document::Field(field_name.c_str(), 
field_config);
+    field->setOmitTermFreqAndPositions(false);
+    field->setIndexVersion(index_version);
+    doc->add(*field);
+
+    for (const auto& data : datas) {
+        char_string_reader->init(data.data(), data.size(), false);
+        auto* stream = analyzer->reusableTokenStream(field->name(), 
char_string_reader);
+        field->setValue(stream);
+        indexwriter->addDocument(doc);
+    }
+
+    indexwriter->close();
+
+    _CLLDELETE(indexwriter);
+    _CLLDELETE(doc);
+    _CLLDELETE(analyzer);
+    _CLLDELETE(char_string_reader);
+}
+
+static void read_index(RAMDirectory* dir, int32_t doc_count) {
+    auto* reader = IndexReader::open(dir);
+
+    std::exception_ptr eptr;
+    try {
+        if (doc_count != reader->numDocs()) {
+            std::string msg = "doc_count: " + std::to_string(doc_count) +
+                              ", numDocs: " + 
std::to_string(reader->numDocs());
+            _CLTHROWA(CL_ERR_IllegalArgument, msg.c_str());
+        }
+
+        Term* term = nullptr;
+        TermEnum* enumerator = nullptr;
+        try {
+            enumerator = reader->terms();
+            while (enumerator->next()) {
+                term = enumerator->term();
+
+                auto* term_pos = reader->termPositions(term);
+
+                std::exception_ptr eptr;
+                try {
+                    TermPositionIterator iter(term_pos);
+                    int32_t doc = 0;
+                    while ((doc = iter.nextDoc()) != INT32_MAX) {
+                        for (int32_t i = 0; i < iter.freq(); i++) {
+                            int32_t pos = iter.nextPosition();
+                            if (pos < 0 || pos > 3) {
+                                std::string msg = "pos: " + 
std::to_string(pos);
+                                _CLTHROWA(CL_ERR_IllegalArgument, msg.c_str());
+                            }
+                        }
+                    }
+                } catch (...) {
+                    eptr = std::current_exception();
+                }
+                FINALLY(eptr, { _CLDELETE(term_pos); })
+
+                _CLDECDELETE(term);
+            }
+        }
+        _CLFINALLY({
+            _CLDECDELETE(term);
+            enumerator->close();
+            _CLDELETE(enumerator);
+        })
+
+    } catch (...) {
+        eptr = std::current_exception();
+    }
+    FINALLY(eptr, {
+        reader->close();
+        _CLLDELETE(reader);
+    })
+}
+
+static void index_compaction(RAMDirectory* tmp_dir, 
std::vector<lucene::store::Directory*> srcDirs,
+                             std::vector<lucene::store::Directory*> destDirs, 
int32_t count) {
+    auto* analyzer = _CLNEW lucene::analysis::SimpleAnalyzer<char>;
+    auto* indexwriter = _CLNEW lucene::index::IndexWriter(tmp_dir, analyzer, 
true);
+
+    std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec(
+            srcDirs.size(), std::vector<std::pair<uint32_t, uint32_t>>(count));
+    int32_t idx = 0;
+    int32_t id = 0;
+    for (int32_t i = 0; i < count; i++) {
+        for (int32_t j = 0; j < srcDirs.size(); j++) {
+            if (id == count * destDirs.size()) {
+                idx++;
+                id = 0;
+            }
+            trans_vec[j][i] = std::make_pair(idx, id++);
+        }
+    }
+
+    std::vector<uint32_t> dest_index_docs(destDirs.size());
+    for (int32_t i = 0; i < destDirs.size(); i++) {
+        dest_index_docs[i] = count * destDirs.size();
+    }
+
+    std::exception_ptr eptr;
+    try {
+        indexwriter->indexCompaction(srcDirs, destDirs, trans_vec, 
dest_index_docs);
+    } catch (...) {
+        eptr = std::current_exception();
+    }
+    FINALLY(eptr, {
+        indexwriter->close();
+        _CLDELETE(indexwriter);
+        _CLDELETE(analyzer);
+    })
+}
+
+void TestIndexCompressV2(CuTest* tc) {
+    std::srand(getDaySeed());
+
+    std::string name = "v2_field_name";
+    std::vector<std::string> datas;
+    for (int32_t i = 0; i < doc_count; i++) {
+        std::string ip_v4 = generateRandomIP();
+        datas.emplace_back(ip_v4);
+    }
+
+    RAMDirectory dir;
+    write_index(name, &dir, IndexVersion::kV2, datas);
+
+    try {
+        read_index(&dir, doc_count);
+    } catch (...) {
+        assertTrue(false);
+    }
+
+    std::cout << "\nTestIndexCompressV2 sucess" << std::endl;
+}
+
+void TestIndexCompactionV2(CuTest* tc) {
+    std::srand(getDaySeed());
+    std::string name = "field_name";
+
+    // index v2
+    RAMDirectory in_dir;
+    {
+        std::vector<std::string> datas;
+        for (int32_t i = 0; i < doc_count; i++) {
+            std::string ip_v4 = generateRandomIP();
+            datas.emplace_back(ip_v4);
+        }
+        write_index(name, &in_dir, IndexVersion::kV2, datas);
+    }
+
+    // index compaction v3
+    RAMDirectory outdir1;
+    RAMDirectory outdir2;
+    RAMDirectory outdir3;
+    {
+        std::vector<lucene::store::Directory*> srcDirs;
+        srcDirs.push_back(&in_dir);
+        srcDirs.push_back(&in_dir);
+        srcDirs.push_back(&in_dir);
+        srcDirs.push_back(&in_dir);
+        srcDirs.push_back(&in_dir);
+        srcDirs.push_back(&in_dir);
+        srcDirs.push_back(&in_dir);
+        srcDirs.push_back(&in_dir);
+        srcDirs.push_back(&in_dir);
+        std::vector<lucene::store::Directory*> destDirs;
+        destDirs.push_back(&outdir1);
+        destDirs.push_back(&outdir2);
+        destDirs.push_back(&outdir3);
+
+        try {
+            RAMDirectory empty_dir;
+            index_compaction(&empty_dir, srcDirs, destDirs, doc_count);
+        } catch (...) {
+            assertTrue(false);
+        }
+    }
+
+    std::cout << "TestIndexCompactionV2 sucess" << std::endl;
+}
+
+void TestIndexCompactionException(CuTest* tc) {
+    std::srand(getDaySeed());
+    std::string name = "field_name";
+
+    // index v1
+    RAMDirectory in_dir_v1;
+    {
+        std::vector<std::string> datas;
+        for (int32_t i = 0; i < 10; i++) {
+            std::string ip_v4 = generateRandomIP();
+            datas.emplace_back(ip_v4);
+        }
+        write_index(name, &in_dir_v1, IndexVersion::kV1, datas);
+    }
+
+    // index v2
+    RAMDirectory in_dir_v2;
+    {
+        std::vector<std::string> datas;
+        for (int32_t i = 0; i < 10; i++) {
+            std::string ip_v4 = generateRandomIP();
+            datas.emplace_back(ip_v4);
+        }
+        write_index(name, &in_dir_v2, IndexVersion::kV2, datas);
+    }
+
+    // index compaction exception 1
+    RAMDirectory out_dir;
+    {
+        std::vector<lucene::store::Directory*> srcDirs;
+        srcDirs.push_back(&in_dir_v1);
+        srcDirs.push_back(&in_dir_v2);
+        std::vector<lucene::store::Directory*> destDirs;
+        destDirs.push_back(&out_dir);
+
+        bool flag = false;
+        try {
+            RAMDirectory empty_dir;
+            index_compaction(&empty_dir, srcDirs, destDirs, 10);
+        } catch (...) {
+            flag = true;
+        }
+        assertTrue(flag);
+    }
+
+    std::cout << "TestIndexCompactionException sucess" << std::endl;
+}
+
+CuSuite* testIndexCompress() {
+    CuSuite* suite = CuSuiteNew(_T("CLucene Index Compress Test"));
+
+    SUITE_ADD_TEST(suite, TestIndexCompressV2);
+    SUITE_ADD_TEST(suite, TestIndexCompactionV2);
+    SUITE_ADD_TEST(suite, TestIndexCompactionException);
+
+    return suite;
+}
diff --git a/src/test/test.h b/src/test/test.h
index 39959327f63..4ec86cb6884 100644
--- a/src/test/test.h
+++ b/src/test/test.h
@@ -85,6 +85,7 @@ CuSuite *testSearchRange(void);
 CuSuite *testMultiPhraseQuery(void);
 CuSuite *testIndexCompaction(void);
 CuSuite *testStringReader(void);
+CuSuite *testIndexCompress(void);
 
 #ifdef TEST_CONTRIB_LIBS
 //CuSuite *testGermanAnalyzer(void);
diff --git a/src/test/tests.cpp b/src/test/tests.cpp
index 5708ff62986..a570dad4663 100644
--- a/src/test/tests.cpp
+++ b/src/test/tests.cpp
@@ -19,6 +19,7 @@ unittest tests[] = {
         {"MultiPhraseQuery", testMultiPhraseQuery},
         {"IndexCompaction", testIndexCompaction},
         {"testStringReader", testStringReader},
+        {"IndexCompress", testIndexCompress},
 #ifdef TEST_CONTRIB_LIBS
         {"chinese", testchinese},
 #endif


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to