This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch clucene in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/clucene by this push: new eb52e86a [Feature] add postion and freq support for phrase query (#74) eb52e86a is described below commit eb52e86a74fa7dd390f6421a884e9eaf3e4f4c12 Author: zzzxl <33418555+zzzxl1...@users.noreply.github.com> AuthorDate: Fri May 26 19:29:42 2023 +0800 [Feature] add postion and freq support for phrase query (#74) --- src/core/CLucene/analysis/Analyzers.h | 2 +- src/core/CLucene/document/Field.cpp | 18 +- src/core/CLucene/document/Field.h | 14 +- src/core/CLucene/index/CodeMode.h | 10 + src/core/CLucene/index/DocumentsWriter.cpp | 96 +++-- .../CLucene/index/DocumentsWriterThreadState.cpp | 2 +- src/core/CLucene/index/FieldInfos.cpp | 76 ++-- src/core/CLucene/index/FieldsReader.cpp | 5 + src/core/CLucene/index/IndexWriter.cpp | 195 ++++++++-- src/core/CLucene/index/IndexWriter.h | 6 +- src/core/CLucene/index/SDocumentWriter.cpp | 164 ++++++--- src/core/CLucene/index/SDocumentWriter.h | 21 +- src/core/CLucene/index/SegmentReader.cpp | 2 +- src/core/CLucene/index/SegmentTermDocs.cpp | 125 +++++-- src/core/CLucene/index/SegmentTermPositions.cpp | 4 +- src/core/CLucene/index/SkipListReader.cpp | 61 ++-- src/core/CLucene/index/SkipListWriter.cpp | 22 +- src/core/CLucene/index/_DocumentsWriter.h | 3 + src/core/CLucene/index/_FieldInfos.h | 32 +- src/core/CLucene/index/_SegmentHeader.h | 1 + src/core/CLucene/index/_SkipListReader.h | 8 +- src/core/CLucene/index/_SkipListWriter.h | 20 +- src/core/CLucene/index/_TermInfosWriter.h | 4 +- src/core/CLucene/search/PhraseQuery.cpp | 2 +- src/core/CLucene/search/PhraseScorer.cpp | 5 +- src/core/CLucene/store/FSDirectory.cpp | 7 + src/demo/CMakeLists.txt | 13 +- src/demo/demo.cpp | 401 +++++++++++++++++++++ 28 files changed, 1063 insertions(+), 256 deletions(-) diff --git a/src/core/CLucene/analysis/Analyzers.h b/src/core/CLucene/analysis/Analyzers.h index 57c4e6f6..f5da43c6 100644 --- a/src/core/CLucene/analysis/Analyzers.h +++ b/src/core/CLucene/analysis/Analyzers.h @@ -184,7 +184,7 @@ public: return _CLNEW SimpleTokenizer<T>(reader); } TokenStream* reusableTokenStream(const TCHAR* fieldName, CL_NS(util)::Reader* reader) override{ - auto* tokenizer = dynamic_cast<Tokenizer*>(getPreviousTokenStream()); + auto* tokenizer = static_cast<Tokenizer*>(getPreviousTokenStream()); if (tokenizer == nullptr) { tokenizer = _CLNEW SimpleTokenizer<T>(reader); setPreviousTokenStream(tokenizer); diff --git a/src/core/CLucene/document/Field.cpp b/src/core/CLucene/document/Field.cpp index 2190d467..13bdf54d 100644 --- a/src/core/CLucene/document/Field.cpp +++ b/src/core/CLucene/document/Field.cpp @@ -134,6 +134,17 @@ void Field::setOmitNorms(const bool omitNorms) { config &= ~INDEX_NONORMS; } +bool Field::getOmitTermFreqAndPositions() const { + return (config & INDEX_NOTERMFREQANDPOSITIONS) != 0; +} + +void Field::setOmitTermFreqAndPositions(const bool OmitTermFreqAndPositions) { + if (OmitTermFreqAndPositions) + config |= INDEX_NOTERMFREQANDPOSITIONS; + else + config &= ~INDEX_NOTERMFREQANDPOSITIONS; +} + bool Field::isLazy() const { return lazy; } void Field::setValue(TCHAR* value, const bool duplicateValue) { @@ -201,15 +212,20 @@ void Field::setConfig(const uint32_t x){ newConfig |= INDEX_NONORMS; index = true; } - if ( x & INDEX_TOKENIZED ){ + if ( x & INDEX_TOKENIZED ){ newConfig |= INDEX_TOKENIZED; index = true; if ( x & INDEX_CHS ) newConfig |= INDEX_CHS; + if (x & INDEX_NOTERMFREQANDPOSITIONS) + newConfig |= INDEX_NOTERMFREQANDPOSITIONS; } else if ( x & INDEX_UNTOKENIZED ){ newConfig |= INDEX_UNTOKENIZED; index = true; + } else if (x & INDEX_NOTERMFREQANDPOSITIONS) { + newConfig |= INDEX_NOTERMFREQANDPOSITIONS; + index = true; } if ( !index ) diff --git a/src/core/CLucene/document/Field.h b/src/core/CLucene/document/Field.h index f8dd0980..970d06ee 100644 --- a/src/core/CLucene/document/Field.h +++ b/src/core/CLucene/document/Field.h @@ -82,30 +82,31 @@ public: * to have the above described effect on a field, all instances of that * field must be indexed with NO_NORMS from the beginning. */ - INDEX_NONORMS=128 + INDEX_NONORMS=128, + INDEX_NOTERMFREQANDPOSITIONS=256 }; enum TermVector{ /** Do not store term vectors. */ - TERMVECTOR_NO=256, + TERMVECTOR_NO=512, /** Store the term vectors of each document. A term vector is a list * of the document's terms and their number of occurences in that document. */ - TERMVECTOR_YES=512, + TERMVECTOR_YES=1024, /** * Store the term vector + token position information * * @see #YES */ - TERMVECTOR_WITH_POSITIONS = TERMVECTOR_YES | 1024, + TERMVECTOR_WITH_POSITIONS = TERMVECTOR_YES | 2048, /** * Store the term vector + Token offset information * * @see #YES */ - TERMVECTOR_WITH_OFFSETS = TERMVECTOR_YES | 2048, + TERMVECTOR_WITH_OFFSETS = TERMVECTOR_YES | 4096, /** * Store the term vector + Token position and offset information @@ -260,6 +261,9 @@ public: * This effectively disables indexing boosts and length normalization for this field. */ void setOmitNorms(const bool omitNorms); + + bool getOmitTermFreqAndPositions() const; + void setOmitTermFreqAndPositions(bool omitTermFreqAndPositions); /** * Indicates whether a Field is Lazy or not. The semantics of Lazy loading are such that if a Field is lazily loaded, retrieving diff --git a/src/core/CLucene/index/CodeMode.h b/src/core/CLucene/index/CodeMode.h new file mode 100644 index 00000000..5ac3718f --- /dev/null +++ b/src/core/CLucene/index/CodeMode.h @@ -0,0 +1,10 @@ +#pragma once + +CL_NS_DEF(index) + +enum class CodeMode { + kDefault = 0, + kPfor = 1 +}; + +CL_NS_END \ No newline at end of file diff --git a/src/core/CLucene/index/DocumentsWriter.cpp b/src/core/CLucene/index/DocumentsWriter.cpp index 8c220bc9..055c33a0 100644 --- a/src/core/CLucene/index/DocumentsWriter.cpp +++ b/src/core/CLucene/index/DocumentsWriter.cpp @@ -17,6 +17,7 @@ #include "CLucene/util/CLStreams.h" #include "CLucene/util/Misc.h" #include "CLucene/util/_Arrays.h" +#include "CLucene/index/CodeMode.h" #include "IndexWriter.h" #include "Term.h" #include "_Term.h" @@ -38,8 +39,10 @@ #if defined(USE_AVX2) && defined(__x86_64__) #define P4ENC p4nd1enc256v32 +#define P4NZENC p4nzenc256v32 #else #define P4ENC p4nd1enc128v32 +#define P4NZENC p4nzenc128v32 #endif CL_NS_USE(util) @@ -556,7 +559,7 @@ void DocumentsWriter::writeSegment(std::vector<std::string> &flushedFiles) { IndexOutput *freqOut = directory->createOutput((segmentName + ".frq").c_str()); // TODO:add options in field index IndexOutput *proxOut = nullptr; - if (0) { + if (fieldInfos->hasProx()) { proxOut = directory->createOutput((segmentName + ".prx").c_str()); } @@ -619,7 +622,7 @@ void DocumentsWriter::writeSegment(std::vector<std::string> &flushedFiles) { // Record all files we have flushed flushedFiles.push_back(segmentFileName(IndexFileNames::FIELD_INFOS_EXTENSION)); flushedFiles.push_back(segmentFileName(IndexFileNames::FREQ_EXTENSION)); - if (0) { + if (fieldInfos->hasProx()) { flushedFiles.push_back(segmentFileName(IndexFileNames::PROX_EXTENSION)); } flushedFiles.push_back(segmentFileName(IndexFileNames::TERMS_EXTENSION)); @@ -747,12 +750,34 @@ void DocumentsWriter::appendPostings(ArrayBase<ThreadState::FieldData *> *fields skipListWriter->resetSkip(); + auto encode = [](IndexOutput* out, std::vector<uint32_t>& buffer, bool isDoc) { + std::vector<uint8_t> compress(4 * buffer.size() + PFOR_BLOCK_SIZE); + size_t size = 0; + if (isDoc) { + size = P4ENC(buffer.data(), buffer.size(), compress.data()); + } else { + size = P4NZENC(buffer.data(), buffer.size(), compress.data()); + } + out->writeVInt(size); + out->writeBytes(reinterpret_cast<const uint8_t*>(compress.data()), size); + buffer.resize(0); + }; + // Now termStates has numToMerge FieldMergeStates // which all share the same term. Now we must // interleave the docID streams. while (numToMerge > 0) { if ((++df % skipInterval) == 0) { + if (fieldInfos->hasProx()) { + freqOut->writeByte((char)CodeMode::kPfor); + freqOut->writeVInt(docDeltaBuffer.size()); + // doc + encode(freqOut, docDeltaBuffer, true); + // freq + encode(freqOut, freqBuffer, false); + } + skipListWriter->setSkipData(lastDoc, currentFieldStorePayloads, lastPayloadLength); skipListWriter->bufferSkip(df); } @@ -801,25 +826,24 @@ void DocumentsWriter::appendPostings(ArrayBase<ThreadState::FieldData *> *fields } } - docDeltaBuffer.push_back(doc); - - /*if (1 == termDocFreq) { - freqOut->writeVInt(newDocCode | 1); + if (fieldInfos->hasProx()) { + docDeltaBuffer.push_back(doc); + freqBuffer.push_back(termDocFreq); } else { - freqOut->writeVInt(newDocCode); - freqOut->writeVInt(termDocFreq); - }*/ - if (docDeltaBuffer.size() == PFOR_BLOCK_SIZE) { - std::vector<uint8_t> compresseddata(4 * docDeltaBuffer.size() + PFOR_BLOCK_SIZE); - auto size = P4ENC(docDeltaBuffer.data(), docDeltaBuffer.size(), compresseddata.data()); - //auto size = p4nd1enc256v32(docDeltaBuffer.data(), docDeltaBuffer.size(), compresseddata.data()); - freqOut->writeVInt(docDeltaBuffer.size()); - freqOut->writeVInt(size); - freqOut->writeBytes(reinterpret_cast<const uint8_t *>(compresseddata.data()), size); - docDeltaBuffer.resize(0); + docDeltaBuffer.push_back(doc); + + /*if (1 == termDocFreq) { + freqOut->writeVInt(newDocCode | 1); + } else { + freqOut->writeVInt(newDocCode); + freqOut->writeVInt(termDocFreq); + }*/ + if (docDeltaBuffer.size() == PFOR_BLOCK_SIZE) { + freqOut->writeVInt(docDeltaBuffer.size()); + encode(freqOut, docDeltaBuffer, true); + } } - if (!minState->nextDoc()) { // Remove from termStates @@ -849,13 +873,35 @@ void DocumentsWriter::appendPostings(ArrayBase<ThreadState::FieldData *> *fields assert(df > 0); // Done merging this term - freqOut->writeVInt(docDeltaBuffer.size()); - uint32_t lDoc = 0; - for (auto &docDelta: docDeltaBuffer) { - freqOut->writeVInt(docDelta - lDoc); - lDoc = docDelta; + { + if (fieldInfos->hasProx()) { + freqOut->writeByte((char)CodeMode::kDefault); + freqOut->writeVInt(docDeltaBuffer.size()); + uint32_t lastDoc = 0; + for (int32_t i = 0; i < docDeltaBuffer.size(); i++) { + uint32_t newDocCode = (docDeltaBuffer[i] - lastDoc) << 1; + lastDoc = docDeltaBuffer[i]; + uint32_t freq = freqBuffer[i]; + if (1 == freq) { + freqOut->writeVInt(newDocCode | 1); + } else { + freqOut->writeVInt(newDocCode); + freqOut->writeVInt(freq); + } + } + docDeltaBuffer.resize(0); + freqBuffer.resize(0); + } else { + freqOut->writeVInt(docDeltaBuffer.size()); + uint32_t lDoc = 0; + for (auto &docDelta: docDeltaBuffer) { + freqOut->writeVInt(docDelta - lDoc); + lDoc = docDelta; + } + docDeltaBuffer.resize(0); + } } - docDeltaBuffer.resize(0); + int64_t skipPointer = skipListWriter->writeSkip(freqOut); // Write term @@ -1158,6 +1204,8 @@ int64_t DocumentsWriter::getRAMUsed() { return numBytesUsed; } +bool DocumentsWriter::hasProx() { return fieldInfos->hasProx(); } + void DocumentsWriter::fillBytes(IndexOutput *out, uint8_t b, int32_t numBytes) { for (int32_t i = 0; i < numBytes; i++) out->writeByte(b); diff --git a/src/core/CLucene/index/DocumentsWriterThreadState.cpp b/src/core/CLucene/index/DocumentsWriterThreadState.cpp index 21e14fe5..765f8952 100644 --- a/src/core/CLucene/index/DocumentsWriterThreadState.cpp +++ b/src/core/CLucene/index/DocumentsWriterThreadState.cpp @@ -208,7 +208,7 @@ void DocumentsWriter::ThreadState::init(Document *doc, int32_t docID) { FieldInfo *fi = _parent->fieldInfos->add(field->name(), field->isIndexed(), field->isTermVectorStored(), field->isStorePositionWithTermVector(), field->isStoreOffsetWithTermVector(), - field->getOmitNorms(), false); + field->getOmitNorms(), !field->getOmitTermFreqAndPositions(), false); if (fi->isIndexed && !fi->omitNorms) { // Maybe grow our buffered norms if (_parent->norms.length <= fi->number) { diff --git a/src/core/CLucene/index/FieldInfos.cpp b/src/core/CLucene/index/FieldInfos.cpp index 2c283d4e..b3260e6d 100644 --- a/src/core/CLucene/index/FieldInfos.cpp +++ b/src/core/CLucene/index/FieldInfos.cpp @@ -29,13 +29,15 @@ FieldInfo::FieldInfo(const TCHAR *_fieldName, const bool _storeOffsetWithTermVector, const bool _storePositionWithTermVector, const bool _omitNorms, + const bool _hasProx, const bool _storePayloads) : name(CLStringIntern::intern(_fieldName )), isIndexed(_isIndexed), number(_fieldNumber), storeTermVector(_storeTermVector), storeOffsetWithTermVector(_storeOffsetWithTermVector), storePositionWithTermVector(_storePositionWithTermVector), - omitNorms(_omitNorms), storePayloads(_storePayloads) { + omitNorms(_omitNorms), hasProx(_hasProx), + storePayloads(_storePayloads) { } FieldInfo::~FieldInfo(){ @@ -44,7 +46,7 @@ FieldInfo::~FieldInfo(){ FieldInfo* FieldInfo::clone() { return _CLNEW FieldInfo(name, isIndexed, number, storeTermVector, storePositionWithTermVector, - storeOffsetWithTermVector, omitNorms, storePayloads); + storeOffsetWithTermVector, omitNorms, hasProx, storePayloads); } FieldInfos::FieldInfos(): @@ -86,10 +88,21 @@ void FieldInfos::add(const Document* doc) { for ( Document::FieldsType::const_iterator itr = fields.begin() ; itr != fields.end() ; itr++ ){ field = *itr; add(field->name(), field->isIndexed(), field->isTermVectorStored(), field->isStorePositionWithTermVector(), - field->isStoreOffsetWithTermVector(), field->getOmitNorms()); + field->isStoreOffsetWithTermVector(), field->getOmitNorms(), !field->getOmitTermFreqAndPositions(), false); } } +bool FieldInfos::hasProx() { + int numFields = byNumber.size(); + for (int i = 0; i < numFields; i++) { + FieldInfo* fi = fieldInfo(i); + if (fi->isIndexed && fi->hasProx) { + return true; + } + } + return false; +} + void FieldInfos::addIndexed(const TCHAR** names, const bool storeTermVectors, const bool storePositionWithTermVector, const bool storeOffsetWithTermVector) { size_t i = 0; @@ -99,26 +112,26 @@ void FieldInfos::addIndexed(const TCHAR** names, const bool storeTermVectors, co } } -void FieldInfos::add(const TCHAR** names,const bool isIndexed, const bool storeTermVectors, - const bool storePositionWithTermVector, const bool storeOffsetWithTermVector, const bool omitNorms, const bool storePayloads) -{ - size_t i=0; +void FieldInfos::add(const TCHAR** names, const bool isIndexed, const bool storeTermVectors, + const bool storePositionWithTermVector, const bool storeOffsetWithTermVector, + const bool omitNorms, const bool hasProx, const bool storePayloads) { + size_t i = 0; while ( names[i] != NULL ){ add(names[i], isIndexed, storeTermVectors, storePositionWithTermVector, - storeOffsetWithTermVector, omitNorms, storePayloads); + storeOffsetWithTermVector, omitNorms, hasProx, storePayloads); ++i; } } -FieldInfo* FieldInfos::add( const TCHAR* name, const bool isIndexed, const bool storeTermVector, - const bool storePositionWithTermVector, const bool storeOffsetWithTermVector, const bool omitNorms, - const bool storePayloads) { - FieldInfo* fi = fieldInfo(name); +FieldInfo* FieldInfos::add(const TCHAR* name, const bool isIndexed, const bool storeTermVector, + const bool storePositionWithTermVector, + const bool storeOffsetWithTermVector, const bool omitNorms, + const bool hasProx, const bool storePayloads) { + FieldInfo* fi = fieldInfo(name); if (fi == NULL) { - return addInternal(name, isIndexed, storeTermVector, - storePositionWithTermVector, - storeOffsetWithTermVector, omitNorms, storePayloads); - } else { + return addInternal(name, isIndexed, storeTermVector, storePositionWithTermVector, + storeOffsetWithTermVector, omitNorms, hasProx, storePayloads); + } else { if (fi->isIndexed != isIndexed) { fi->isIndexed = true; // once indexed, always index } @@ -134,6 +147,9 @@ FieldInfo* FieldInfos::add( const TCHAR* name, const bool isIndexed, const bool if (fi->omitNorms != omitNorms) { fi->omitNorms = false; // once norms are stored, always store } + if (fi->hasProx != hasProx) { + fi->hasProx = true; + } if (fi->storePayloads != storePayloads) { fi->storePayloads = true; } @@ -141,13 +157,15 @@ FieldInfo* FieldInfos::add( const TCHAR* name, const bool isIndexed, const bool return fi; } -FieldInfo* FieldInfos::addInternal( const TCHAR* name, const bool isIndexed, const bool storeTermVector, - const bool storePositionWithTermVector, const bool storeOffsetWithTermVector, - const bool omitNorms, const bool storePayloads) { - - FieldInfo* fi = _CLNEW FieldInfo(name, isIndexed, byNumber.size(), storeTermVector, - storePositionWithTermVector, storeOffsetWithTermVector, omitNorms, storePayloads); - byNumber.push_back(fi); +FieldInfo* FieldInfos::addInternal(const TCHAR* name, const bool isIndexed, + const bool storeTermVector, + const bool storePositionWithTermVector, + const bool storeOffsetWithTermVector, const bool omitNorms, + const bool hasProx, const bool storePayloads) { + FieldInfo* fi = _CLNEW FieldInfo(name, isIndexed, byNumber.size(), storeTermVector, + storePositionWithTermVector, storeOffsetWithTermVector, + omitNorms, hasProx, storePayloads); + byNumber.push_back(fi); byName.put( fi->name, fi); return fi; } @@ -208,16 +226,17 @@ void FieldInfos::write(IndexOutput* output) const{ if (fi->storeOffsetWithTermVector) bits |= STORE_OFFSET_WITH_TERMVECTOR; if (fi->omitNorms) bits |= OMIT_NORMS; if (fi->storePayloads) bits |= STORE_PAYLOADS; + if (fi->hasProx) bits |= TERM_FREQ_AND_POSITIONS; - output->writeString(fi->name,_tcslen(fi->name)); - output->writeByte(bits); + output->writeString(fi->name,_tcslen(fi->name)); + output->writeByte(bits); } } void FieldInfos::read(IndexInput* input) { int32_t size = input->readVInt();//read in the size uint8_t bits; - bool isIndexed,storeTermVector,storePositionsWithTermVector,storeOffsetWithTermVector,omitNorms,storePayloads; + bool isIndexed,storeTermVector,storePositionsWithTermVector,storeOffsetWithTermVector,omitNorms,hasProx,storePayloads; for (int32_t i = 0; i < size; ++i){ TCHAR* name = input->readString(); //we could read name into a string buffer, but we can't be sure what the maximum field length will be. bits = input->readByte(); @@ -226,9 +245,10 @@ void FieldInfos::read(IndexInput* input) { storePositionsWithTermVector = (bits & STORE_POSITIONS_WITH_TERMVECTOR) != 0; storeOffsetWithTermVector = (bits & STORE_OFFSET_WITH_TERMVECTOR) != 0; omitNorms = (bits & OMIT_NORMS) != 0; - storePayloads = (bits & STORE_PAYLOADS) != 0; + storePayloads = (bits & STORE_PAYLOADS) != 0; + hasProx = (bits & TERM_FREQ_AND_POSITIONS) != 0; - addInternal(name, isIndexed, storeTermVector, storePositionsWithTermVector, storeOffsetWithTermVector, omitNorms, storePayloads); + addInternal(name, isIndexed, storeTermVector, storePositionsWithTermVector, storeOffsetWithTermVector, omitNorms, hasProx, storePayloads); _CLDELETE_CARRAY(name); } } diff --git a/src/core/CLucene/index/FieldsReader.cpp b/src/core/CLucene/index/FieldsReader.cpp index 04575871..4adbdaff 100644 --- a/src/core/CLucene/index/FieldsReader.cpp +++ b/src/core/CLucene/index/FieldsReader.cpp @@ -225,6 +225,7 @@ void FieldsReader::addFieldLazy(CL_NS(document)::Document& doc, const FieldInfo* //skip over the part that we aren't loading fieldsStream->seek(pointer + toRead); f->setOmitNorms(fi->omitNorms); + f->setOmitTermFreqAndPositions(!fi->hasProx); } else { int32_t length = fieldsStream->readVInt(); int64_t pointer = fieldsStream->getFilePointer(); @@ -232,6 +233,7 @@ void FieldsReader::addFieldLazy(CL_NS(document)::Document& doc, const FieldInfo* fieldsStream->skipChars(length); f = _CLNEW LazyField(this, fi->name, Field::STORE_YES | getIndexType(fi, tokenize) | getTermVectorType(fi), length, pointer); f->setOmitNorms(fi->omitNorms); + f->setOmitTermFreqAndPositions(!fi->hasProx); } doc.add(*f); } @@ -317,6 +319,7 @@ void FieldsReader::addField(CL_NS(document)::Document& doc, const FieldInfo* fi, bits, false); #endif f->setOmitNorms(fi->omitNorms); + f->setOmitTermFreqAndPositions(!fi->hasProx); } else { bits |= Field::STORE_YES; TCHAR* str = fieldsStream->readString(); @@ -324,6 +327,7 @@ void FieldsReader::addField(CL_NS(document)::Document& doc, const FieldInfo* fi, str, // read value bits, false); f->setOmitNorms(fi->omitNorms); + f->setOmitTermFreqAndPositions(!fi->hasProx); } doc.add(*f); } @@ -530,6 +534,7 @@ FieldsReader::FieldForMerge::FieldForMerge(void* _value, ValueType _type, const if (fi->isIndexed && !tokenize) bits |= INDEX_UNTOKENIZED; if (fi->omitNorms) bits |= INDEX_NONORMS; + if (!fi->hasProx) bits |= INDEX_NOTERMFREQANDPOSITIONS; if (fi->storeOffsetWithTermVector) bits |= TERMVECTOR_WITH_OFFSETS; if (fi->storePositionWithTermVector) bits |= TERMVECTOR_WITH_POSITIONS; if (fi->storeTermVector) bits |= TERMVECTOR_YES; diff --git a/src/core/CLucene/index/IndexWriter.cpp b/src/core/CLucene/index/IndexWriter.cpp index 040e2679..c3ea10e7 100644 --- a/src/core/CLucene/index/IndexWriter.cpp +++ b/src/core/CLucene/index/IndexWriter.cpp @@ -22,6 +22,7 @@ #include "CLucene/store/_RAMDirectory.h" #include "CLucene/util/Array.h" #include "CLucene/util/PriorityQueue.h" +#include "CLucene/index/CodeMode.h" #include "MergePolicy.h" #include "MergeScheduler.h" #include "SDocumentWriter.h" @@ -40,9 +41,11 @@ #include <iostream> #if defined(USE_AVX2) && defined(__x86_64__) -#define P4ENC p4nd1enc256v32 +#define P4ENC p4nd1enc256v32 +#define P4NZENC p4nzenc256v32 #else -#define P4ENC p4nd1enc128v32 +#define P4ENC p4nd1enc128v32 +#define P4NZENC p4nzenc128v32 #endif CL_NS_USE(store) @@ -1278,6 +1281,21 @@ void IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d message(string("index compaction total doc count: ") + Misc::toString(totDocCount)); } + // check hasProx + bool hasProx = false; + { + if (!readers.empty()) { + auto reader = static_cast<SegmentReader*>(readers[0]); + hasProx = reader->getFieldInfos()->hasProx(); + for (int32_t i = 1; i < readers.size(); i++) { + if (hasProx != reader->getFieldInfos()->hasProx()) { + _CLTHROWA(CL_ERR_IllegalArgument, "src_dirs hasProx inconformity"); + } + } + } + } + // std::cout << "hasProx: " << hasProx << std::endl; + numDestIndexes = dest_dirs.size(); // print dest index files @@ -1297,7 +1315,7 @@ void IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d std::vector<lucene::index::IndexWriter *> destIndexWriterList; try { /// merge fields - mergeFields(); + mergeFields(hasProx); /// write fields and create files writers for (int j = 0; j < numDestIndexes; j++) { @@ -1321,11 +1339,13 @@ void IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d /// create file writers // Open an IndexOutput to the new Frequency File IndexOutput *freqOut = dest_dir->createOutput((Misc::segmentname(segment.c_str(), ".frq").c_str())); + freqPointers.push_back(0); freqOutputList.push_back(freqOut); // Open an IndexOutput to the new Prox File IndexOutput *proxOut = nullptr; - if (0) { + if (hasProx) { proxOut = dest_dir->createOutput(Misc::segmentname(segment.c_str(), ".prx").c_str()); + proxPointers.push_back(0); } proxOutputList.push_back(proxOut); // Instantiate a new termInfosWriter which will write in directory @@ -1339,7 +1359,7 @@ void IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d } /// merge terms - mergeTerms(); + mergeTerms(hasProx); } catch (CLuceneError &e) { throw e; } @@ -1515,7 +1535,7 @@ void IndexWriter::addIndexesSegments(std::vector<lucene::store::Directory *> &di } } -void IndexWriter::mergeFields() { +void IndexWriter::mergeFields(bool hasProx) { //Create a new FieldInfos fieldInfos = _CLNEW FieldInfos(); //Condition check to see if fieldInfos points to a valid instance @@ -1532,7 +1552,7 @@ void IndexWriter::mergeFields() { FieldInfo *fi = segmentReader->getFieldInfos()->fieldInfo(j); fieldInfos->add(fi->name, fi->isIndexed, fi->storeTermVector, fi->storePositionWithTermVector, fi->storeOffsetWithTermVector, - !reader->hasNorms(fi->name), fi->storePayloads); + !reader->hasNorms(fi->name), hasProx, fi->storePayloads); } } } @@ -1546,6 +1566,8 @@ struct DestDoc { uint32_t srcIdx{}; uint32_t destIdx{}; uint32_t destDocId{}; + uint32_t destFreq{}; + std::vector<uint32_t> destPositions{}; DestDoc() = default;; DestDoc(uint32_t srcIdx, uint32_t destIdx, uint32_t destDocId) : srcIdx(srcIdx), destIdx(destIdx), destDocId(destDocId) {} @@ -1575,7 +1597,7 @@ protected: }; -void IndexWriter::mergeTerms() { +void IndexWriter::mergeTerms(bool hasProx) { auto queue = _CLNEW SegmentMergeQueue(readers.size()); auto numSrcIndexes = readers.size(); //std::vector<TermPositions *> postingsList(numSrcIndexes); @@ -1606,6 +1628,9 @@ void IndexWriter::mergeTerms() { match[matchSize++] = queue->pop(); Term *smallestTerm = match[0]->term; + // std::wstring ws = smallestTerm->text(); + // std::string name = std::string(ws.begin(), ws.end()); + // std::cout << name << std::endl; SegmentMergeInfo *top = queue->top(); while (top != nullptr && smallestTerm->equals(top->term)) { @@ -1614,6 +1639,7 @@ void IndexWriter::mergeTerms() { } std::vector<std::vector<uint32_t>> docDeltaBuffers(numDestIndexes); + std::vector<std::vector<uint32_t>> freqBuffers(numDestIndexes); auto destPostingQueues = _CLNEW postingQueue(matchSize); std::vector<DestDoc> destDocs(matchSize); for (int i = 0; i < matchSize; ++i) { @@ -1630,24 +1656,97 @@ void IndexWriter::mergeTerms() { destDocs[i].srcIdx = i; destDocs[i].destIdx = destIdx; destDocs[i].destDocId = destDocId; + + if (hasProx) { + int32_t freq = postings->freq(); + destDocs[i].destFreq = freq; + destDocs[i].destPositions.resize(freq); + + for (int32_t j = 0; j < freq; j++) { + int32_t position = postings->nextPosition(); + destDocs[i].destPositions[j] = position; + } + } + destPostingQueues->put(&destDocs[i]); } } + + auto encode = [](IndexOutput* out, std::vector<uint32_t>& buffer, bool isDoc) { + std::vector<uint8_t> compress(4 * buffer.size() + PFOR_BLOCK_SIZE); + size_t size = 0; + if (isDoc) { + size = P4ENC(buffer.data(), buffer.size(), compress.data()); + } else { + size = P4NZENC(buffer.data(), buffer.size(), compress.data()); + } + out->writeVInt(size); + out->writeBytes(reinterpret_cast<const uint8_t*>(compress.data()), size); + buffer.resize(0); + }; + + std::vector<int32_t> dfs(numDestIndexes, 0); + std::vector<int32_t> lastDocs(numDestIndexes, 0); while (destPostingQueues->size()) { if (destPostingQueues->top() != nullptr) { auto destDoc = destPostingQueues->pop(); auto destIdx = destDoc->destIdx; auto destDocId = destDoc->destDocId; + auto destFreq = destDoc->destFreq; + auto& descPositions = destDoc->destPositions; + + auto freqOut = freqOutputList[destIdx]; + auto proxOut = proxOutputList[destIdx]; + auto& docDeltaBuffer = docDeltaBuffers[destIdx]; + auto& freqBuffer = freqBuffers[destIdx]; + auto skipWriter = skipListWriterList[destIdx]; + auto& df = dfs[destIdx]; + auto& lastDoc = lastDocs[destIdx]; + + if (df == 0) { + freqPointers[destIdx] = freqOut->getFilePointer(); + if (hasProx) { + proxPointers[destIdx] = proxOut->getFilePointer(); + } + skipWriter->resetSkip(); + } + + if ((++df % skipInterval) == 0) { + if (hasProx) { + freqOut->writeByte((char)CodeMode::kPfor); + freqOut->writeVInt(docDeltaBuffer.size()); + // doc + encode(freqOut, docDeltaBuffer, true); + // freq + encode(freqOut, freqBuffer, false); + } + skipWriter->setSkipData(lastDoc, false, -1); + skipWriter->bufferSkip(df); + } + + assert(destDocId > lastDoc || df == 1); + lastDoc = destDocId; + + if (hasProx) { + // position + int32_t lastPosition = 0; + for (int32_t i = 0; i < descPositions.size(); i++) { + int32_t position = descPositions[i]; + int32_t delta = position - lastPosition; + proxOut->writeVInt(delta); + lastPosition = position; + } - docDeltaBuffers[destIdx].push_back(destDocId); - if (docDeltaBuffers[destIdx].size() == PFOR_BLOCK_SIZE) { - std::vector<uint8_t> compresseddata(4 * docDeltaBuffers[destIdx].size() + PFOR_BLOCK_SIZE); - auto size = P4ENC(docDeltaBuffers[destIdx].data(), docDeltaBuffers[destIdx].size(), compresseddata.data()); - freqOutputList[destIdx]->writeVInt(docDeltaBuffers[destIdx].size()); - freqOutputList[destIdx]->writeVInt(size); - freqOutputList[destIdx]->writeBytes(reinterpret_cast<const uint8_t *>(compresseddata.data()), size); - docDeltaBuffers[destIdx].resize(0); + docDeltaBuffer.push_back(destDocId); + freqBuffer.push_back(destFreq); + } else { + docDeltaBuffer.push_back(destDocId); + if (docDeltaBuffer.size() == PFOR_BLOCK_SIZE) { + freqOut->writeVInt(docDeltaBuffer.size()); + encode(freqOut, docDeltaBuffer, true); + } } + smi = match[destDoc->srcIdx]; TermPositions *postings = smi->getPositions(); if (postings->next()) { @@ -1655,6 +1754,18 @@ void IndexWriter::mergeTerms() { std::pair<int32_t, uint32_t> p = _trans_vec[smi->readerIndex][srcDoc]; destDoc->destIdx = p.first; destDoc->destDocId = p.second; + + if (hasProx) { + int32_t freq = postings->freq(); + destDoc->destFreq = freq; + destDoc->destPositions.resize(freq); + + for (int32_t j = 0; j < freq; j++) { + int32_t position = postings->nextPosition(); + destDoc->destPositions[j] = position; + } + } + destPostingQueues->put(destDoc); } } @@ -1668,33 +1779,47 @@ void IndexWriter::mergeTerms() { CL_NS(store)::IndexOutput *freqOutput = freqOutputList[i]; CL_NS(store)::IndexOutput *proxOutput = proxOutputList[i]; TermInfosWriter *termInfosWriter = termInfosWriterList[i]; - - int32_t docNums = docDeltaBuffers[i].size(); - if (docNums <= 0) { - continue; - } - - // Get the file pointer of the IndexOutput to the Frequency File - int64_t freqPointer = freqOutput->getFilePointer(); - // Get the file pointer of the IndexOutput to the Prox File + int64_t freqPointer = freqPointers[i]; int64_t proxPointer = 0; - if (proxOutput != nullptr) { - proxPointer = proxOutput->getFilePointer(); + if (hasProx) { + proxPointer = proxPointers[i]; } - freqOutput->writeVInt(docDeltaBuffers[i].size()); - - uint32_t lDoc = 0; - for (auto &docDelta: docDeltaBuffers[i]) { - freqOutput->writeVInt(docDelta - lDoc); - lDoc = docDelta; + if (hasProx) { + auto& docDeltaBuffer = docDeltaBuffers[i]; + auto& freqBuffer = freqBuffers[i]; + + freqOutput->writeByte((char)CodeMode::kDefault); + freqOutput->writeVInt(docDeltaBuffer.size()); + uint32_t lastDoc = 0; + for (int32_t i = 0; i < docDeltaBuffer.size(); i++) { + uint32_t newDocCode = (docDeltaBuffer[i] - lastDoc) << 1; + lastDoc = docDeltaBuffer[i]; + uint32_t freq = freqBuffer[i]; + if (1 == freq) { + freqOutput->writeVInt(newDocCode | 1); + } else { + freqOutput->writeVInt(newDocCode); + freqOutput->writeVInt(freq); + } + } + docDeltaBuffer.resize(0); + freqBuffer.resize(0); + } else { + freqOutput->writeVInt(docDeltaBuffers[i].size()); + uint32_t lDoc = 0; + for (auto &docDelta: docDeltaBuffers[i]) { + freqOutput->writeVInt(docDelta - lDoc); + lDoc = docDelta; + } + docDeltaBuffers[i].resize(0); } - docDeltaBuffers[i].resize(0); + int64_t skipPointer = skipListWriter->writeSkip(freqOutput); // write terms TermInfo termInfo; - termInfo.set(docNums, freqPointer, proxPointer, (int32_t) (skipPointer - freqPointer)); + termInfo.set(dfs[i], freqPointer, proxPointer, (int32_t) (skipPointer - freqPointer)); // Write a new TermInfo termInfosWriter->add(smallestTerm, &termInfo); } diff --git a/src/core/CLucene/index/IndexWriter.h b/src/core/CLucene/index/IndexWriter.h index 1464bf95..b5aabf1b 100644 --- a/src/core/CLucene/index/IndexWriter.h +++ b/src/core/CLucene/index/IndexWriter.h @@ -286,8 +286,10 @@ class CLUCENE_EXPORT IndexWriter:LUCENE_BASE { FieldInfos* fieldInfos; // IndexOutput to the new Frequency File std::vector<CL_NS(store)::IndexOutput*> freqOutputList; + std::vector<int64_t> freqPointers; // IndexOutput to the new Prox File std::vector<CL_NS(store)::IndexOutput*> proxOutputList; + std::vector<int64_t> proxPointers; std::vector<TermInfosWriter*> termInfosWriterList; int32_t skipInterval; int32_t maxSkipLevels; @@ -325,11 +327,11 @@ public: void addIndexesSegments(std::vector<lucene::store::Directory*>& dirs); // create new fields info - void mergeFields(); + void mergeFields(bool hasProx); // write fields info file void writeFields(lucene::store::Directory* d, std::string segment); // merge terms and write files - void mergeTerms(); + void mergeTerms(bool hasProx); // Compare current index with the other void compareIndexes(lucene::store::Directory* other); diff --git a/src/core/CLucene/index/SDocumentWriter.cpp b/src/core/CLucene/index/SDocumentWriter.cpp index 4ef25b9a..6c06519c 100644 --- a/src/core/CLucene/index/SDocumentWriter.cpp +++ b/src/core/CLucene/index/SDocumentWriter.cpp @@ -11,6 +11,7 @@ #include "CLucene/util/CLStreams.h" #include "CLucene/util/Misc.h" #include "CLucene/util/stringUtil.h" +#include "CLucene/index/CodeMode.h" #include "_FieldsWriter.h" #include "_TermInfosWriter.h" @@ -21,11 +22,14 @@ #include <algorithm> #include <vector> +#include <iostream> #if defined(USE_AVX2) && defined(__x86_64__) #define P4ENC p4nd1enc256v32 +#define P4NZENC p4nzenc256v32 #else #define P4ENC p4nd1enc128v32 +#define P4NZENC p4nzenc128v32 #endif CL_NS_USE(util) @@ -166,7 +170,7 @@ void SDocumentsWriter<T>::ThreadState::init(Document *doc, int32_t doc_id) { FieldInfo *fi = _parent->fieldInfos->add(field->name(), field->isIndexed(), field->isTermVectorStored(), field->isStorePositionWithTermVector(), field->isStoreOffsetWithTermVector(), - field->getOmitNorms(), false); + field->getOmitNorms(), !field->getOmitTermFreqAndPositions(), false); if (fi->isIndexed && !fi->omitNorms) { // Maybe grow our buffered norms if (_parent->norms.length <= fi->number) { @@ -413,7 +417,7 @@ void SDocumentsWriter<T>::ThreadState::writeProxByte(uint8_t b) { if (prox[proxUpto] != 0) { proxUpto = postingsPool->allocSlice(prox, proxUpto); prox = postingsPool->buffer; - //p->proxUpto = postingsPool->tOffset; + p->proxUpto = postingsPool->tOffset; assert(prox != nullptr); } prox[proxUpto++] = b; @@ -467,7 +471,7 @@ void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) { } while (threadState->p != nullptr && !threadState->postingEquals(tokenText, tokenTextLen)); } - //int32_t proxCode; + int32_t proxCode = 0; try { if (threadState->p != nullptr) { // term seen since last flush if (threadState->docID != threadState->p->lastDocID) {// term not yet seen in this doc @@ -475,17 +479,24 @@ void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) { // write it & lastDocCode threadState->freqUpto = threadState->p->freqUpto & BYTE_BLOCK_MASK; threadState->freq = threadState->postingsPool->buffers[threadState->p->freqUpto >> BYTE_BLOCK_SHIFT]; - //if (1 == threadState->p->docFreq) - threadState->writeFreqVInt(threadState->p->lastDocCode | 1); - //else { - // threadState->writeFreqVInt(threadState->p->lastDocCode); - // threadState->writeFreqVInt(threadState->p->docFreq); - // } - threadState->p->freqUpto = threadState->freqUpto + (threadState->p->freqUpto & BYTE_BLOCK_NOT_MASK); - //proxCode = position; + if (fieldInfo->hasProx) { + if (1 == threadState->p->docFreq) + threadState->writeFreqVInt(threadState->p->lastDocCode | 1); + else { + threadState->writeFreqVInt(threadState->p->lastDocCode); + threadState->writeFreqVInt(threadState->p->docFreq); + } + } else { + threadState->writeFreqVInt(threadState->p->lastDocCode | 1); + } - //threadState->p->docFreq = 1; + threadState->p->freqUpto = threadState->freqUpto + (threadState->p->freqUpto & BYTE_BLOCK_NOT_MASK); + + if (fieldInfo->hasProx) { + proxCode = position; + threadState->p->docFreq = 1; + } // Store code so we can write this after we're // done with this new doc @@ -493,9 +504,10 @@ void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) { threadState->p->lastDocID = threadState->docID; } else {// term already seen in this doc - //threadState->p->docFreq++; - - //proxCode = position - threadState->p->lastPosition; + if (fieldInfo->hasProx) { + threadState->p->docFreq++; + proxCode = position - threadState->p->lastPosition; + } } } else {// term not seen before if (0 == threadState->postingsFreeCountTS) { @@ -534,24 +546,29 @@ void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) { const int32_t upto1 = threadState->postingsPool->newSlice(firstSize); threadState->p->freqStart = threadState->p->freqUpto = threadState->postingsPool->tOffset + upto1; - //const int32_t upto2 = threadState->postingsPool->newSlice(firstSize); - //threadState->p->proxStart = threadState->p->proxUpto = threadState->postingsPool->tOffset + upto2; + if (fieldInfo->hasProx) { + const int32_t upto2 = threadState->postingsPool->newSlice(firstSize); + threadState->p->proxStart = threadState->p->proxUpto = threadState->postingsPool->tOffset + upto2; + } threadState->p->lastDocCode = threadState->docID << 1; threadState->p->lastDocID = threadState->docID; - //threadState->p->docFreq = 1; - //proxCode = position; - } - - //threadState->proxUpto = threadState->p->proxUpto & BYTE_BLOCK_MASK; - //threadState->prox = threadState->postingsPool->buffers[threadState->p->proxUpto >> BYTE_BLOCK_SHIFT]; - //assert(threadState->prox != nullptr); - //threadState->writeProxVInt(proxCode << 1); + if (fieldInfo->hasProx) { + threadState->p->docFreq = 1; + proxCode = position; + } + } - //threadState->p->proxUpto = threadState->proxUpto + (threadState->p->proxUpto & BYTE_BLOCK_NOT_MASK); + threadState->proxUpto = threadState->p->proxUpto & BYTE_BLOCK_MASK; + threadState->prox = threadState->postingsPool->buffers[threadState->p->proxUpto >> BYTE_BLOCK_SHIFT]; + assert(threadState->prox != nullptr); - threadState->p->lastPosition = position++; + if (fieldInfo->hasProx) { + threadState->writeProxVInt(proxCode << 1); + threadState->p->proxUpto = threadState->proxUpto + (threadState->p->proxUpto & BYTE_BLOCK_NOT_MASK); + threadState->p->lastPosition = position++; + } } catch (CLuceneError &t) { throw; } @@ -930,7 +947,7 @@ void SDocumentsWriter<T>::writeSegment(std::vector<std::string> &flushedFiles) { IndexOutput *freqOut = directory->createOutput((segmentName + ".frq").c_str()); // TODO:add options in field index IndexOutput *proxOut = nullptr; - if (0) { + if (fieldInfos->hasProx()) { proxOut = directory->createOutput((segmentName + ".prx").c_str()); } @@ -952,7 +969,7 @@ void SDocumentsWriter<T>::writeSegment(std::vector<std::string> &flushedFiles) { skipListWriter = _CLNEW DefaultSkipListWriter(termsOut->skipInterval, termsOut->maxSkipLevels, - numDocsInRAM, freqOut, proxOut); + numDocsInRAM, freqOut, proxOut); int32_t start = 0; while (start < numAllFields) { @@ -990,7 +1007,7 @@ void SDocumentsWriter<T>::writeSegment(std::vector<std::string> &flushedFiles) { // Record all files we have flushed flushedFiles.push_back(segmentFileName(IndexFileNames::FIELD_INFOS_EXTENSION)); flushedFiles.push_back(segmentFileName(IndexFileNames::FREQ_EXTENSION)); - if (0) { + if (fieldInfos->hasProx()) { flushedFiles.push_back(segmentFileName(IndexFileNames::PROX_EXTENSION)); } flushedFiles.push_back(segmentFileName(IndexFileNames::TERMS_EXTENSION)); @@ -1112,18 +1129,40 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa int64_t freqPointer = freqOut->getFilePointer(); int64_t proxPointer = 0; - if (proxOut != nullptr) { + if (fieldInfos->hasProx()) { proxPointer = proxOut->getFilePointer(); } skipListWriter->resetSkip(); + auto encode = [](IndexOutput* out, std::vector<uint32_t>& buffer, bool isDoc) { + std::vector<uint8_t> compress(4 * buffer.size() + PFOR_BLOCK_SIZE); + size_t size = 0; + if (isDoc) { + size = P4ENC(buffer.data(), buffer.size(), compress.data()); + } else { + size = P4NZENC(buffer.data(), buffer.size(), compress.data()); + } + out->writeVInt(size); + out->writeBytes(reinterpret_cast<const uint8_t*>(compress.data()), size); + buffer.resize(0); + }; + // Now termStates has numToMerge FieldMergeStates // which all share the same term. Now we must // interleave the docID streams. while (numToMerge > 0) { if ((++df % skipInterval) == 0) { + if (fieldInfos->hasProx()) { + freqOut->writeByte((char)CodeMode::kPfor); + freqOut->writeVInt(docDeltaBuffer.size()); + // doc + encode(freqOut, docDeltaBuffer, true); + // freq + encode(freqOut, freqBuffer, false); + } + skipListWriter->setSkipData(lastDoc, currentFieldStorePayloads, lastPayloadLength); skipListWriter->bufferSkip(df); } @@ -1147,27 +1186,25 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa // Carefully copy over the prox + payload info, // changing the format to match Lucene's segment // format. - if (proxOut != nullptr) { + + if (fieldInfos->hasProx()) { + // position for (int32_t j = 0; j < termDocFreq; j++) { const int32_t code = prox.readVInt(); assert(0 == (code & 1)); proxOut->writeVInt(code >> 1); } - } - - docDeltaBuffer.push_back(doc); - if (docDeltaBuffer.size() == PFOR_BLOCK_SIZE) { - std::vector<uint8_t> compresseddata(4 * docDeltaBuffer.size() + PFOR_BLOCK_SIZE); - auto size = P4ENC(docDeltaBuffer.data(), docDeltaBuffer.size(), compresseddata.data()); - //auto size = p4nd1enc256v32(docDeltaBuffer.data(), docDeltaBuffer.size(), compresseddata.data()); - freqOut->writeVInt(docDeltaBuffer.size()); - freqOut->writeVInt(size); - freqOut->writeBytes(reinterpret_cast<const uint8_t *>(compresseddata.data()), size); - docDeltaBuffer.resize(0); + docDeltaBuffer.push_back(doc); + freqBuffer.push_back(termDocFreq); + } else { + docDeltaBuffer.push_back(doc); + if (docDeltaBuffer.size() == PFOR_BLOCK_SIZE) { + freqOut->writeVInt(docDeltaBuffer.size()); + encode(freqOut, docDeltaBuffer, true); + } } - if (!minState->nextDoc()) { // Remove from termStates @@ -1197,13 +1234,35 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa assert(df > 0); // Done merging this term - freqOut->writeVInt(docDeltaBuffer.size()); - uint32_t lDoc = 0; - for (auto &docDelta: docDeltaBuffer) { - freqOut->writeVInt(docDelta - lDoc); - lDoc = docDelta; + { + if (fieldInfos->hasProx()) { + freqOut->writeByte((char)CodeMode::kDefault); + freqOut->writeVInt(docDeltaBuffer.size()); + uint32_t lastDoc = 0; + for (int32_t i = 0; i < docDeltaBuffer.size(); i++) { + uint32_t newDocCode = (docDeltaBuffer[i] - lastDoc) << 1; + lastDoc = docDeltaBuffer[i]; + uint32_t freq = freqBuffer[i]; + if (1 == freq) { + freqOut->writeVInt(newDocCode | 1); + } else { + freqOut->writeVInt(newDocCode); + freqOut->writeVInt(freq); + } + } + docDeltaBuffer.resize(0); + freqBuffer.resize(0); + } else { + freqOut->writeVInt(docDeltaBuffer.size()); + uint32_t lDoc = 0; + for (auto& docDelta : docDeltaBuffer) { + freqOut->writeVInt(docDelta - lDoc); + lDoc = docDelta; + } + docDeltaBuffer.resize(0); + } } - docDeltaBuffer.resize(0); + int64_t skipPointer = skipListWriter->writeSkip(freqOut); // Write term @@ -1435,6 +1494,11 @@ void SDocumentsWriter<T>::balanceRAM() { } } +template<typename T> +bool SDocumentsWriter<T>::hasProx() { + return fieldInfos->hasProx(); +} + template class SDocumentsWriter<char>; template class SDocumentsWriter<TCHAR>; CL_NS_END diff --git a/src/core/CLucene/index/SDocumentWriter.h b/src/core/CLucene/index/SDocumentWriter.h index 453cb6ae..f1da76fa 100644 --- a/src/core/CLucene/index/SDocumentWriter.h +++ b/src/core/CLucene/index/SDocumentWriter.h @@ -50,6 +50,7 @@ private: bool closed{}; std::string segment;// Current segment we are working on std::vector<uint32_t> docDeltaBuffer; + std::vector<uint32_t> freqBuffer; std::ostream* infoStream{}; int64_t ramBufferSize; @@ -59,11 +60,11 @@ public: struct Posting { int32_t textStart; // Address into char[] blocks where our text is stored int32_t textLen; // our text length - //int32_t docFreq; // # times this term occurs in the current doc + int32_t docFreq = 0; // # times this term occurs in the current doc int32_t freqStart; // Address of first uint8_t[] slice for freq int32_t freqUpto; // Next write address for freq - //int32_t proxStart; // Address of first uint8_t[] slice - //int32_t proxUpto; // Next write address for prox + int32_t proxStart = 0; // Address of first uint8_t[] slice + int32_t proxUpto = 0; // Next write address for prox int32_t lastDocID; // Last docID where this term occurred int32_t lastDocCode; // Code for prior doc int32_t lastPosition;// Last position where this term occurred @@ -598,7 +599,9 @@ public: else freq.bufferOffset = freq.upto = freq.endIndex = 0; - //prox.init(field->threadState->postingsPool, p->proxStart, p->proxUpto); + if (field->fieldInfo->hasProx) { + prox.init(field->threadState->postingsPool, p->proxStart, p->proxUpto); + } // Should always be true bool result = nextDoc(); @@ -612,7 +615,13 @@ public: if (p->lastDocCode != -1) { // Return last doc docID = p->lastDocID; - termFreq = 1;//p->docFreq; + + if (field->fieldInfo->hasProx) { + termFreq = p->docFreq; + } else { + termFreq = 1; + } + p->lastDocCode = -1; return true; } else @@ -781,6 +790,8 @@ public: int64_t getRAMUsed() override {_CLTHROW_NOT_IMPLEMENT} const std::vector<int32_t> *getBufferedDeleteDocIDs() override {_CLTHROW_NOT_IMPLEMENT} + bool hasProx() override; + private: std::vector<std::string>* _files = nullptr; diff --git a/src/core/CLucene/index/SegmentReader.cpp b/src/core/CLucene/index/SegmentReader.cpp index d914de88..0b53de82 100644 --- a/src/core/CLucene/index/SegmentReader.cpp +++ b/src/core/CLucene/index/SegmentReader.cpp @@ -196,7 +196,7 @@ void SegmentReader::initialize(SegmentInfo *si, int32_t readBufferSize, bool doO // so that if an index update removes them we'll still have them freqStream = cfsDir->openInput((segment + ".frq").c_str(), readBufferSize); // TODO: should be true when we could set writing terms positions in field conf flag. - if (0) { + if (_fieldInfos->hasProx()) { proxStream = cfsDir->openInput((segment + ".prx").c_str(), readBufferSize); } // we do not need norms, so we don't read it at all. diff --git a/src/core/CLucene/index/SegmentTermDocs.cpp b/src/core/CLucene/index/SegmentTermDocs.cpp index ae4994a2..f64256d8 100644 --- a/src/core/CLucene/index/SegmentTermDocs.cpp +++ b/src/core/CLucene/index/SegmentTermDocs.cpp @@ -8,16 +8,20 @@ #include "_SegmentHeader.h" #include "CLucene/store/IndexInput.h" +#include "CLucene/index/CodeMode.h" #include "Term.h" #include "vp4.h" #include <assert.h> #include <memory> +#include <iostream> #if defined(USE_AVX2) && defined(__x86_64__) #define P4DEC p4nd1dec256v32 +#define P4NZDEC p4nzdec256v32 #else #define P4DEC p4nd1dec128v32 +#define P4NZDEC p4nzdec128v32 #endif CL_NS_DEF(index) @@ -67,6 +71,7 @@ void SegmentTermDocs::seek(const TermInfo *ti, Term *term) { count = 0; FieldInfo *fi = parent->_fieldInfos->fieldInfo(term->field()); currentFieldStoresPayloads = (fi != NULL) ? fi->storePayloads : false; + hasProx = (fi != nullptr) && fi->hasProx; if (ti == NULL) { df = 0; } else {// punt case @@ -132,47 +137,100 @@ bool SegmentTermDocs::next() { int32_t SegmentTermDocs::read(int32_t *docs, int32_t *freqs, int32_t length) { int32_t i = 0; - //todo: one optimization would be to get the pointer buffer for ram or mmap dirs - //and iterate over them instead of using readByte() intensive functions. - if (i < length && count < df) { + + if (hasProx) { + if (count == df) { + return i; + } + + char mode = freqStream->readByte(); + uint32_t arraySize = freqStream->readVInt(); - if (arraySize < PFOR_BLOCK_SIZE) { - int32_t _docDelta{0}; - while (i < length && count < df && i < arraySize) { - // manually inlined call to next() for speed + std::vector<uint32_t> _docs(arraySize); + std::vector<uint32_t> _freqs(arraySize); + + if (mode == (char)CodeMode::kDefault) { + uint32_t docDelta = 0; + for (uint32_t i = 0; i < arraySize; i++) { uint32_t docCode = freqStream->readVInt(); - _docDelta += docCode; - _doc = _docDelta; - _freq = 1; // _freq is one - count++; - - if (deletedDocs == NULL || (_doc >= 0 && !deletedDocs->get(_doc))) { - docs[i] = _doc; - freqs[i] = _freq; - i++; + docDelta += (docCode >> 1); + _docs[i] = docDelta; + if ((docCode & 1) != 0) { + _freqs[i] = 1; + } else { + _freqs[i] = freqStream->readVInt(); } } } else { - // need one more space for decode, otherwise will buffer overflow when decoding - std::vector<uint32_t> arr(arraySize + 1); - uint32_t serializedSize = freqStream->readVInt(); - // need more space for decode, otherwise will buffer overflow when decoding - std::vector<uint8_t> buf(serializedSize + PFOR_BLOCK_SIZE); - freqStream->readBytes(buf.data(), serializedSize); - P4DEC(buf.data(), arraySize, arr.data()); - - while (i < length && count < df && i < arraySize) { - _doc = arr[i]; - _freq = 1;// _freq is one - if (deletedDocs == NULL || (_doc >= 0 && !deletedDocs->get(_doc))) { - docs[i] = _doc; - freqs[i] = _freq; - i++; + { + uint32_t SerializedSize = freqStream->readVInt(); + std::vector<uint8_t> buf(SerializedSize + PFOR_BLOCK_SIZE); + freqStream->readBytes(buf.data(), SerializedSize); + P4DEC(buf.data(), arraySize, _docs.data()); + } + { + uint32_t SerializedSize = freqStream->readVInt(); + std::vector<uint8_t> buf(SerializedSize + PFOR_BLOCK_SIZE); + freqStream->readBytes(buf.data(), SerializedSize); + P4NZDEC(buf.data(), arraySize, _freqs.data()); + } + } + + while (i < arraySize && count < df) { + _doc = _docs[i]; + _freq = _freqs[i]; + count++; + if (deletedDocs == NULL || (_doc >= 0 && !deletedDocs->get(_doc))) { + docs[i] = _doc; + freqs[i] = _freq; + i++; + } + } + + } else { + //todo: one optimization would be to get the pointer buffer for ram or mmap dirs + //and iterate over them instead of using readByte() intensive functions. + if (i < length && count < df) { + uint32_t arraySize = freqStream->readVInt(); + if (arraySize < PFOR_BLOCK_SIZE) { + int32_t _docDelta{0}; + while (i < length && count < df && i < arraySize) { + // manually inlined call to next() for speed + uint32_t docCode = freqStream->readVInt(); + _docDelta += docCode; + _doc = _docDelta; + _freq = 1; // _freq is one + count++; + + if (deletedDocs == NULL || (_doc >= 0 && !deletedDocs->get(_doc))) { + docs[i] = _doc; + freqs[i] = _freq; + i++; + } + } + } else { + // need one more space for decode, otherwise will buffer overflow when decoding + std::vector<uint32_t> arr(arraySize + 1); + uint32_t serializedSize = freqStream->readVInt(); + // need more space for decode, otherwise will buffer overflow when decoding + std::vector<uint8_t> buf(serializedSize + PFOR_BLOCK_SIZE); + freqStream->readBytes(buf.data(), serializedSize); + P4DEC(buf.data(), arraySize, arr.data()); + + while (i < length && count < df && i < arraySize) { + _doc = arr[i]; + _freq = 1;// _freq is one + if (deletedDocs == NULL || (_doc >= 0 && !deletedDocs->get(_doc))) { + docs[i] = _doc; + freqs[i] = _freq; + i++; + } + count++; } - count++; } } } + return i; } @@ -184,7 +242,7 @@ bool SegmentTermDocs::skipTo(const int32_t target) { skipListReader = _CLNEW DefaultSkipListReader(freqStream->clone(), maxSkipLevels, skipInterval);// lazily clone if (!haveSkipped) {// lazily initialize skip stream - skipListReader->init(skipPointer, freqBasePointer, proxBasePointer, df, currentFieldStoresPayloads); + skipListReader->init(skipPointer, freqBasePointer, proxBasePointer, df, hasProx, currentFieldStoresPayloads); haveSkipped = true; } @@ -195,6 +253,7 @@ bool SegmentTermDocs::skipTo(const int32_t target) { _doc = skipListReader->getDoc(); count = newCount; + pointer = pointerMax; } } diff --git a/src/core/CLucene/index/SegmentTermPositions.cpp b/src/core/CLucene/index/SegmentTermPositions.cpp index 5fe1e787..f132be6a 100644 --- a/src/core/CLucene/index/SegmentTermPositions.cpp +++ b/src/core/CLucene/index/SegmentTermPositions.cpp @@ -9,6 +9,8 @@ #include "Terms.h" +#include <iostream> + CL_NS_USE(util) CL_NS_DEF(index) @@ -52,7 +54,7 @@ void SegmentTermPositions::close() { int32_t SegmentTermPositions::nextPosition() { // TODO:need to do like this: if (indexOptions != IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) - if (parent->proxStream== NULL){ + if (!hasProx){ return 0; } // perform lazy skips if neccessary diff --git a/src/core/CLucene/index/SkipListReader.cpp b/src/core/CLucene/index/SkipListReader.cpp index b2b7136b..54747564 100644 --- a/src/core/CLucene/index/SkipListReader.cpp +++ b/src/core/CLucene/index/SkipListReader.cpp @@ -259,44 +259,39 @@ DefaultSkipListReader::DefaultSkipListReader(CL_NS(store)::IndexInput* _skipStre : MultiLevelSkipListReader(_skipStream, maxSkipLevels, _skipInterval) { freqPointer = _CL_NEWARRAY(int64_t,maxSkipLevels); - if (0) { - proxPointer = _CL_NEWARRAY(int64_t, maxSkipLevels); - } + proxPointer = _CL_NEWARRAY(int64_t, maxSkipLevels); payloadLength = _CL_NEWARRAY(int32_t,maxSkipLevels); memset(freqPointer,0, sizeof(int64_t) * maxSkipLevels); - if (0) { - memset(proxPointer, 0, sizeof(int64_t) * maxSkipLevels); - } + memset(proxPointer, 0, sizeof(int64_t) * maxSkipLevels); memset(payloadLength,0, sizeof(int32_t) * maxSkipLevels); this->lastFreqPointer = 0; - if (0) { - this->lastProxPointer = 0; - } + this->lastProxPointer = 0; this->lastPayloadLength = 0; this->currentFieldStoresPayloads = false; } DefaultSkipListReader::~DefaultSkipListReader(){ _CLDELETE_LARRAY(freqPointer); - if (0) { - _CLDELETE_LARRAY(proxPointer); - } + _CLDELETE_LARRAY(proxPointer); _CLDELETE_LARRAY(payloadLength); } -void DefaultSkipListReader::init(const int64_t _skipPointer, const int64_t freqBasePointer, const int64_t proxBasePointer, const int32_t df, const bool storesPayloads) { - MultiLevelSkipListReader::init(_skipPointer, df); +void DefaultSkipListReader::init(const int64_t _skipPointer, const int64_t freqBasePointer, + const int64_t proxBasePointer, const int32_t df, + const bool hasProx, const bool storesPayloads) { + MultiLevelSkipListReader::init(_skipPointer, df); this->currentFieldStoresPayloads = storesPayloads; + this->hasProx = hasProx; lastFreqPointer = freqBasePointer; - if (0) { - lastProxPointer = proxBasePointer; - } + if (this->hasProx) { + lastProxPointer = proxBasePointer; + } for (int32_t j=0; j<maxNumberOfSkipLevels; j++){ freqPointer[j] = freqBasePointer; - if (0) { - proxPointer[j] = proxBasePointer; - } + if (this->hasProx) { + proxPointer[j] = proxBasePointer; + } payloadLength[j] = 0; } } @@ -305,10 +300,10 @@ int64_t DefaultSkipListReader::getFreqPointer() const { return lastFreqPointer; } int64_t DefaultSkipListReader::getProxPointer() const { - return 0; - if (0) { - return lastProxPointer; - } + if (hasProx) { + return lastProxPointer; + } + return 0; } int32_t DefaultSkipListReader::getPayloadLength() const { return lastPayloadLength; @@ -317,18 +312,18 @@ int32_t DefaultSkipListReader::getPayloadLength() const { void DefaultSkipListReader::seekChild(const int32_t level) { MultiLevelSkipListReader::seekChild(level); freqPointer[level] = lastFreqPointer; - if (0) { - proxPointer[level] = lastProxPointer; - } + if (hasProx) { + proxPointer[level] = lastProxPointer; + } payloadLength[level] = lastPayloadLength; } void DefaultSkipListReader::setLastSkipData(const int32_t level) { MultiLevelSkipListReader::setLastSkipData(level); lastFreqPointer = freqPointer[level]; - if (0) { - lastProxPointer = proxPointer[level]; - } + if (hasProx) { + lastProxPointer = proxPointer[level]; + } lastPayloadLength = payloadLength[level]; } @@ -349,9 +344,9 @@ int32_t DefaultSkipListReader::readSkipData(const int32_t level, CL_NS(store)::I delta = _skipStream->readVInt(); } freqPointer[level] += _skipStream->readVInt(); - if (0) { - proxPointer[level] += _skipStream->readVInt(); - } + if (hasProx) { + proxPointer[level] += _skipStream->readVInt(); + } return delta; } diff --git a/src/core/CLucene/index/SkipListWriter.cpp b/src/core/CLucene/index/SkipListWriter.cpp index 2b25ff68..55ebfb3b 100644 --- a/src/core/CLucene/index/SkipListWriter.cpp +++ b/src/core/CLucene/index/SkipListWriter.cpp @@ -102,7 +102,7 @@ void DefaultSkipListWriter::setSkipData(int32_t doc, bool storePayloads, int32_t this->curStorePayloads = storePayloads; this->curPayloadLength = payloadLength; this->curFreqPointer = freqOutput->getFilePointer(); - if (proxOutput != nullptr) { + if (hasProx) { this->curProxPointer = proxOutput->getFilePointer(); } } @@ -111,7 +111,7 @@ void DefaultSkipListWriter::resetSkip() { MultiLevelSkipListWriter::resetSkip(); memset(lastSkipDoc, 0, numberOfSkipLevels * sizeof(int32_t)); Arrays<int32_t>::fill(lastSkipPayloadLength, numberOfSkipLevels, -1);// we don't have to write the first length in the skip list - if (proxOutput != nullptr) { + if (hasProx) { Arrays<int64_t>::fill(lastSkipProxPointer, numberOfSkipLevels, proxOutput->getFilePointer()); } Arrays<int64_t>::fill(lastSkipFreqPointer, numberOfSkipLevels, freqOutput->getFilePointer()); @@ -156,7 +156,7 @@ void DefaultSkipListWriter::writeSkipData(int32_t level, IndexOutput *skipBuffer skipBuffer->writeVInt(curDoc - lastSkipDoc[level]); } skipBuffer->writeVInt((int32_t) (curFreqPointer - lastSkipFreqPointer[level])); - if (curProxPointer != -1) { + if (hasProx) { skipBuffer->writeVInt((int32_t) (curProxPointer - lastSkipProxPointer[level])); } @@ -164,12 +164,16 @@ void DefaultSkipListWriter::writeSkipData(int32_t level, IndexOutput *skipBuffer //System.out.println("write doc at level " + level + ": " + curDoc); lastSkipFreqPointer[level] = curFreqPointer; - if (curProxPointer != -1) { + if (hasProx) { lastSkipProxPointer[level] = curProxPointer; } } -DefaultSkipListWriter::DefaultSkipListWriter(int32_t skipInterval, int32_t numberOfSkipLevels, int32_t docCount, IndexOutput *freqOutput, IndexOutput *proxOutput) : MultiLevelSkipListWriter(skipInterval, numberOfSkipLevels, docCount) { +DefaultSkipListWriter::DefaultSkipListWriter(int32_t skipInterval, int32_t numberOfSkipLevels, + int32_t docCount, IndexOutput* freqOutput, + IndexOutput* proxOutput) + : MultiLevelSkipListWriter(skipInterval, numberOfSkipLevels, docCount), + hasProx(proxOutput != nullptr) { this->freqOutput = freqOutput; this->proxOutput = proxOutput; this->curDoc = this->curPayloadLength = 0; @@ -178,12 +182,16 @@ DefaultSkipListWriter::DefaultSkipListWriter(int32_t skipInterval, int32_t numbe lastSkipDoc = _CL_NEWARRAY(int32_t, numberOfSkipLevels); lastSkipPayloadLength = _CL_NEWARRAY(int32_t, numberOfSkipLevels); lastSkipFreqPointer = _CL_NEWARRAY(int64_t, numberOfSkipLevels); - lastSkipProxPointer = _CL_NEWARRAY(int64_t, numberOfSkipLevels); + if (hasProx) { + lastSkipProxPointer = _CL_NEWARRAY(int64_t, numberOfSkipLevels); + } } DefaultSkipListWriter::~DefaultSkipListWriter() { _CLDELETE_ARRAY(lastSkipDoc); _CLDELETE_ARRAY(lastSkipPayloadLength); _CLDELETE_ARRAY(lastSkipFreqPointer); - _CLDELETE_ARRAY(lastSkipProxPointer); + if (hasProx) { + _CLDELETE_ARRAY(lastSkipProxPointer); + } } CL_NS_END diff --git a/src/core/CLucene/index/_DocumentsWriter.h b/src/core/CLucene/index/_DocumentsWriter.h index f50d8e85..0af73e53 100644 --- a/src/core/CLucene/index/_DocumentsWriter.h +++ b/src/core/CLucene/index/_DocumentsWriter.h @@ -101,6 +101,7 @@ public: virtual int64_t getRAMUsed() = 0; virtual const std::vector<int32_t>* getBufferedDeleteDocIDs() = 0; virtual ~IDocumentsWriter() {} + virtual bool hasProx() = 0; }; /** @@ -209,6 +210,7 @@ private: // Currently used only for deleting a doc on hitting an non-aborting exception std::vector<int32_t> bufferedDeleteDocIDs; std::vector<uint32_t> docDeltaBuffer; + std::vector<uint32_t> freqBuffer; // The max number of delete terms that can be buffered before // they must be flushed to disk. @@ -994,6 +996,7 @@ public: std::string toMB(int64_t v); + bool hasProx() override; }; diff --git a/src/core/CLucene/index/_FieldInfos.h b/src/core/CLucene/index/_FieldInfos.h index 99f0c6f3..414dcc90 100644 --- a/src/core/CLucene/index/_FieldInfos.h +++ b/src/core/CLucene/index/_FieldInfos.h @@ -32,6 +32,7 @@ class FieldInfo :LUCENE_BASE{ bool storePositionWithTermVector; bool omitNorms; // omit norms associated with indexed fields + bool hasProx = false; bool storePayloads; // whether this field stores payloads together with term positions @@ -54,6 +55,7 @@ class FieldInfo :LUCENE_BASE{ const bool storeOffsetWithTermVector, const bool storePositionWithTermVector, const bool omitNorms, + const bool hasProx, const bool storePayloads); //Func - Destructor @@ -88,7 +90,8 @@ public: STORE_POSITIONS_WITH_TERMVECTOR = 0x4, STORE_OFFSET_WITH_TERMVECTOR = 0x8, OMIT_NORMS = 0x10, - STORE_PAYLOADS = 0x20 + STORE_PAYLOADS = 0x20, + TERM_FREQ_AND_POSITIONS = 0x40 }; FieldInfos(); @@ -122,6 +125,8 @@ public: */ void addIndexed(const TCHAR** names, const bool storeTermVectors, const bool storePositionWithTermVector, const bool storeOffsetWithTermVector); + bool hasProx(); + /** * Assumes the fields are not storing term vectors. * @@ -130,11 +135,12 @@ public: * * @see #add(TCHAR*, bool) */ - void add(const TCHAR** names, const bool isIndexed, const bool storeTermVector=false, - const bool storePositionWithTermVector=false, const bool storeOffsetWithTermVector=false, - const bool omitNorms=false, const bool storePayloads=false); + void add(const TCHAR** names, const bool isIndexed, const bool storeTermVector = false, + const bool storePositionWithTermVector = false, + const bool storeOffsetWithTermVector = false, const bool omitNorms = false, + const bool hasProx = false, const bool storePayloads = false); - // Merges in information from another FieldInfos. + // Merges in information from another FieldInfos. void add(FieldInfos* other); /** If the field is not yet known, adds it. If it is known, checks to make @@ -150,12 +156,16 @@ public: * @param omitNorms true if the norms for the indexed field should be omitted * @param storePayloads true if payloads should be stored for this field */ - FieldInfo* add(const TCHAR* name, const bool isIndexed, const bool storeTermVector=false, - const bool storePositionWithTermVector=false, const bool storeOffsetWithTermVector=false, const bool omitNorms=false, const bool storePayloads=false); - - // was void - FieldInfo* addInternal( const TCHAR* name,const bool isIndexed, const bool storeTermVector, - const bool storePositionWithTermVector, const bool storeOffsetWithTermVector, const bool omitNorms, const bool storePayloads); + FieldInfo* add(const TCHAR* name, const bool isIndexed, const bool storeTermVector = false, + const bool storePositionWithTermVector = false, + const bool storeOffsetWithTermVector = false, const bool omitNorms = false, + const bool hasProx = false, const bool storePayloads = false); + + // was void + FieldInfo* addInternal(const TCHAR* name, const bool isIndexed, const bool storeTermVector, + const bool storePositionWithTermVector, + const bool storeOffsetWithTermVector, const bool omitNorms, + const bool hasProx, const bool storePayloads); int32_t fieldNumber(const TCHAR* fieldName)const; diff --git a/src/core/CLucene/index/_SegmentHeader.h b/src/core/CLucene/index/_SegmentHeader.h index 7eaece9f..8a1b8d41 100644 --- a/src/core/CLucene/index/_SegmentHeader.h +++ b/src/core/CLucene/index/_SegmentHeader.h @@ -56,6 +56,7 @@ private: protected: bool currentFieldStoresPayloads; + bool hasProx = false; public: ///\param Parent must be a segment reader diff --git a/src/core/CLucene/index/_SkipListReader.h b/src/core/CLucene/index/_SkipListReader.h index 05e0e284..5031a981 100644 --- a/src/core/CLucene/index/_SkipListReader.h +++ b/src/core/CLucene/index/_SkipListReader.h @@ -144,6 +144,8 @@ protected: class DefaultSkipListReader: public MultiLevelSkipListReader { private: bool currentFieldStoresPayloads; + bool hasProx = false; + int64_t* freqPointer{nullptr}; int64_t* proxPointer{nullptr}; int32_t* payloadLength; @@ -156,9 +158,11 @@ public: DefaultSkipListReader(CL_NS(store)::IndexInput* _skipStream, const int32_t maxSkipLevels, const int32_t _skipInterval); virtual ~DefaultSkipListReader(); - void init(const int64_t _skipPointer, const int64_t freqBasePointer, const int64_t proxBasePointer, const int32_t df, const bool storesPayloads); + void init(const int64_t _skipPointer, const int64_t freqBasePointer, + const int64_t proxBasePointer, const int32_t df, const bool hasProx, + const bool storesPayloads); - /** Returns the freq pointer of the doc to which the last call of + /** Returns the freq pointer of the doc to which the last call of * {@link MultiLevelSkipListReader#skipTo(int)} has skipped. */ int64_t getFreqPointer() const; diff --git a/src/core/CLucene/index/_SkipListWriter.h b/src/core/CLucene/index/_SkipListWriter.h index a22a37a8..2181c69a 100644 --- a/src/core/CLucene/index/_SkipListWriter.h +++ b/src/core/CLucene/index/_SkipListWriter.h @@ -95,7 +95,7 @@ private: int32_t* lastSkipDoc; int32_t* lastSkipPayloadLength; int64_t* lastSkipFreqPointer; - int64_t* lastSkipProxPointer; + int64_t* lastSkipProxPointer = nullptr; CL_NS(store)::IndexOutput* freqOutput; CL_NS(store)::IndexOutput* proxOutput{nullptr}; @@ -105,6 +105,8 @@ private: int32_t curPayloadLength; int64_t curFreqPointer; int64_t curProxPointer{-1}; + + bool hasProx = false; protected: @@ -112,17 +114,17 @@ protected: public: - DefaultSkipListWriter(int32_t skipInterval, int32_t numberOfSkipLevels, int32_t docCount, + DefaultSkipListWriter(int32_t skipInterval, int32_t numberOfSkipLevels, int32_t docCount, CL_NS(store)::IndexOutput* freqOutput, CL_NS(store)::IndexOutput* proxOutput); - ~DefaultSkipListWriter(); - - friend class SegmentMerger; - friend class DocumentsWriter; - /** + ~DefaultSkipListWriter(); + + friend class SegmentMerger; + friend class DocumentsWriter; + /** * Sets the values for the current skip data. */ - void setSkipData(int32_t doc, bool storePayloads, int32_t payloadLength); - void resetSkip(); + void setSkipData(int32_t doc, bool storePayloads, int32_t payloadLength); + void resetSkip(); }; diff --git a/src/core/CLucene/index/_TermInfosWriter.h b/src/core/CLucene/index/_TermInfosWriter.h index c2f72e70..2bd7713a 100644 --- a/src/core/CLucene/index/_TermInfosWriter.h +++ b/src/core/CLucene/index/_TermInfosWriter.h @@ -45,7 +45,7 @@ public: int32_t maxSkipLevels; LUCENE_STATIC_CONSTANT(int32_t, FORMAT = -3); - LUCENE_STATIC_CONSTANT(int32_t, DEFAULT_TERMDOCS_SKIP_INTERVAL = 16); + LUCENE_STATIC_CONSTANT(int32_t, DEFAULT_TERMDOCS_SKIP_INTERVAL = PFOR_BLOCK_SIZE); int32_t indexInterval;// = 128 int32_t skipInterval;// = 16 @@ -103,7 +103,7 @@ public: //smaller indices, greater acceleration, but fewer accelerable cases, while //smaller values result in bigger indices, less acceleration and more //accelerable cases. More detailed experiments would be useful here. */ - LUCENE_STATIC_CONSTANT(int32_t, DEFAULT_TERMDOCS_SKIP_INTERVAL = 16); + LUCENE_STATIC_CONSTANT(int32_t, DEFAULT_TERMDOCS_SKIP_INTERVAL = PFOR_BLOCK_SIZE); /** diff --git a/src/core/CLucene/search/PhraseQuery.cpp b/src/core/CLucene/search/PhraseQuery.cpp index 2942d0df..f0070d51 100644 --- a/src/core/CLucene/search/PhraseQuery.cpp +++ b/src/core/CLucene/search/PhraseQuery.cpp @@ -204,7 +204,7 @@ CL_NS_DEF(search) Query* termQuery = _CLNEW TermQuery(term); termQuery->setBoost(getBoost()); Weight* ret = termQuery->_createWeight(searcher); - _CLLDELETE(termQuery); + // _CLLDELETE(termQuery); return ret; } return _CLNEW PhraseWeight(searcher,this); diff --git a/src/core/CLucene/search/PhraseScorer.cpp b/src/core/CLucene/search/PhraseScorer.cpp index 06d88363..cfab2eac 100644 --- a/src/core/CLucene/search/PhraseScorer.cpp +++ b/src/core/CLucene/search/PhraseScorer.cpp @@ -101,8 +101,9 @@ CL_NS_DEF(search) float_t PhraseScorer::score(){ //System.out.println("scoring " + first.doc); - float_t raw = getSimilarity()->tf(freq) * value; // raw score - return raw * Similarity::decodeNorm(norms[first->doc]); // normalize + // float_t raw = getSimilarity()->tf(freq) * value; // raw score + // return raw * Similarity::decodeNorm(norms[first->doc]); // normalize + return 1.0f; } bool PhraseScorer::skipTo(int32_t target) { diff --git a/src/core/CLucene/store/FSDirectory.cpp b/src/core/CLucene/store/FSDirectory.cpp index 5392f6bb..5d7532e8 100644 --- a/src/core/CLucene/store/FSDirectory.cpp +++ b/src/core/CLucene/store/FSDirectory.cpp @@ -22,6 +22,7 @@ #include <errno.h> #include <assert.h> +#include <iostream> #include "FSDirectory.h" #include "LockFactory.h" @@ -241,6 +242,12 @@ void FSDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len) { CND_PRECONDITION(handle->fhandle>=0,"file is not open"); SCOPED_LOCK_MUTEX(*handle->SHARED_LOCK) + // todo: The file pointer may be wrong + int64_t position = getFilePointer(); + if (_pos != position) { + _pos = position; + } + if ( handle->_fpos != _pos ){ if ( fileSeek(handle->fhandle,_pos,SEEK_SET) != _pos ){ _CLTHROWA( CL_ERR_IO, "File IO Seek error"); diff --git a/src/demo/CMakeLists.txt b/src/demo/CMakeLists.txt index d524b451..171e554a 100644 --- a/src/demo/CMakeLists.txt +++ b/src/demo/CMakeLists.txt @@ -62,7 +62,7 @@ ADD_EXECUTABLE(abnormal EXCLUDE_FROM_ALL ) TARGET_LINK_LIBRARIES(abnormal clucene-core-static clucene-shared-static clucene-contribs-lib ic ${EXTRA_LIBS}) -ADD_EXECUTABLE(search +ADD_EXECUTABLE(search EXCLUDE_FROM_ALL ./search.cpp ${clucene-shared_SOURCE_DIR}/CLucene/util/Misc.cpp ${clucene-shared_SOURCE_DIR}/CLucene/util/dirent.cpp @@ -95,4 +95,13 @@ ADD_EXECUTABLE(pointSearch8 EXCLUDE_FROM_ALL ${clucene-shared_SOURCE_DIR}/CLucene/util/dirent.cpp ${demo_HEADERS} ) -TARGET_LINK_LIBRARIES(pointSearch8 clucene-core-static clucene-shared-static clucene-contribs-lib ic ${EXTRA_LIBS}) \ No newline at end of file +TARGET_LINK_LIBRARIES(pointSearch8 clucene-core-static clucene-shared-static clucene-contribs-lib ic ${EXTRA_LIBS}) + +ADD_EXECUTABLE(demo EXCLUDE_FROM_ALL + ./demo.cpp + ./simdjson.cpp + ${clucene-shared_SOURCE_DIR}/CLucene/util/Misc.cpp + ${clucene-shared_SOURCE_DIR}/CLucene/util/dirent.cpp + ${demo_HEADERS} +) +TARGET_LINK_LIBRARIES(demo clucene-core-static clucene-shared-static clucene-contribs-lib ic ${EXTRA_LIBS}) \ No newline at end of file diff --git a/src/demo/demo.cpp b/src/demo/demo.cpp new file mode 100644 index 00000000..667b2070 --- /dev/null +++ b/src/demo/demo.cpp @@ -0,0 +1,401 @@ +#include <CLucene.h> +#include <CLucene/analysis/LanguageBasedAnalyzer.h> +#include <stdlib.h> +#include <time.h> +#include <unistd.h> + +#include <algorithm> +#include <chrono> +#include <codecvt> +#include <cstdint> +#include <fstream> +#include <iostream> +#include <locale> +#include <string> +#include <thread> +#include <unordered_map> +#include <utility> +#include <variant> +#include <vector> + +#include "CLucene/search/TermQuery.h" +#include "CLucene/store/Directory.h" +#include "CLucene/store/FSDirectory.h" +#include "CLucene/util/NumericUtils.h" +#include "CLucene/util/bkd/bkd_writer.h" +#include "simdjson.h" + +CL_NS_USE(util) +CL_NS_USE(store) +CL_NS_USE(search) +CL_NS_USE(index) + +using namespace simdjson; +using namespace std::chrono; + +class TimeGuard { +public: + TimeGuard(std::string message) : message_(std::move(message)) { + begin_ = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + } + + ~TimeGuard() { + int64_t end = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + std::cout << message_ << ": " << end - begin_ << std::endl; + } + +private: + std::string message_; + int64_t begin_ = 0; +}; + +inline std::wstring to_wide_string(const std::string& input) { + std::wstring_convert<std::codecvt_utf8<wchar_t>> converter; + return converter.from_bytes(input); +} + +void get_process_physical_hold(int64_t& phy_hold) { + phy_hold = 0; + int64_t unused = 0; + char buffer[1024] = ""; + FILE* file = fopen("/proc/self/status", "r"); + if (NULL != file) { + while (fscanf(file, " %1023s", buffer) == 1) { + if (strcmp(buffer, "VmRSS:") == 0) { + fscanf(file, " %ld", &phy_hold); + } + if (strcmp(buffer, "VmHWM:") == 0) { + fscanf(file, " %ld", &unused); + } + if (strcmp(buffer, "VmSize:") == 0) { + fscanf(file, " %ld", &unused); + } + if (strcmp(buffer, "VmPeak:") == 0) { + fscanf(file, " %ld", &unused); + } + } + fclose(file); + } +} + +int main() { + srand((unsigned)time(NULL)); + + // std::locale loc("zh_CN.UTF-8"); + // std::locale::global(loc); + + // std::thread t([]() { + // for (;;) { + // std::this_thread::sleep_for(std::chrono::seconds(2)); + // int64_t process_hold = 0; + // get_process_physical_hold(process_hold); + // std::cout << "physicalMem: " << process_hold / 1024 << std::endl; + // } + // }); + // t.detach(); + + std::string name = "description"; + std::unordered_map<std::string, std::vector<std::string>> datas; + { + for (int32_t i = 1; i <= 6; i++) { + std::ifstream ifs; + std::string path = "/mnt/disk2/yangsiyu/git_events/2022-09-13-"; + path += std::to_string(i + 5); + path += ".json"; + ifs.open(path, std::ios::in); + std::string line; + ondemand::parser parser; + int j = 0; + while (getline(ifs, line)) { + // padded_string json(line); + // ondemand::document tweet = parser.iterate(json); + // for (auto d : tweet.get_object()) { + // std::string key = (std::string)d.unescaped_key().value(); + // if (key == "payload") { + // for (auto d1 : d.value().get_object()) { + // std::string key1 = (std::string)d1.unescaped_key().value(); + // if (key1 == name) { + // if (!d1.value().is_null()) { + // std::string value = + // (std::string)d1.value().raw_json_token().value(); + // // if (value == "\"https://api.github.com/users/Juveniel\"") { + // // static int32_t a = 0; + // // std::cout << key1 << ", " << value << ", " << a++ << std::endl; + // // } + // datas[key1].emplace_back(std::move(value)); + // } + // } + // } + // } + + // // if (key == name) { + // // std::string value = (std::string)d.value().raw_json_token().value(); + // // datas[key].emplace_back(std::move(value)); + // // } + // } + datas[name].emplace_back(line); + // if (++j == 10) { + // break; + // } + } + ifs.close(); + } + } + + // { + // std::ofstream ofs; + // ofs.open("/mnt/disk2/yangsiyu/git_events/a.txt"); + // for (auto& data : datas[name]) { + // ofs << data << std::endl; + // } + // ofs.close(); + // } + + // { + // std::ifstream ifs; + // ifs.open("/mnt/disk2/yangsiyu/git_events/a.txt"); + // std::string line; + // while (getline(ifs, line)) { + // datas[name].push_back(line); + // } + // ifs.close(); + // } + + std::cout << "getchar: " << datas[name].size() << ", pid: " << getpid() << std::endl; + getchar(); + + // std::vector<std::thread> threads; + // for (int32_t i = 0; i < 3; i++) { + // threads.emplace_back([&datas, i]() { + for (int32_t k = 0; k < 1; k++) { + TimeGuard t("time"); + + std::string path = "/mnt/disk2/yangsiyu/clucene/index" + std::to_string(k + 1); + // auto analyzer = _CLNEW lucene::analysis::standard::StandardAnalyzer(); + auto analyzer = _CLNEW lucene::analysis::SimpleAnalyzer<char>(); + // auto analyzer = _CLNEW lucene::analysis::SimpleAnalyzer<TCHAR>(); + auto indexwriter = _CLNEW lucene::index::IndexWriter(path.c_str(), analyzer, true); + indexwriter->setRAMBufferSizeMB(2048); + indexwriter->setMaxFieldLength(0x7FFFFFFFL); + indexwriter->setMergeFactor(100000000); + indexwriter->setUseCompoundFile(false); + + auto char_string_reader = _CLNEW lucene::util::SStringReader<char>; + + auto doc = _CLNEW lucene::document::Document(); + auto field_config = + lucene::document::Field::STORE_NO | lucene::document::Field::INDEX_NONORMS; + field_config |= lucene::document::Field::INDEX_TOKENIZED; + auto field_name = std::wstring(name.begin(), name.end()); + auto field = _CLNEW lucene::document::Field(field_name.c_str(), field_config); + field->setOmitTermFreqAndPositions(false); + doc->add(*field); + + for (int32_t j = 0; j < 1; j++) { + for (auto& str : datas[name]) { + // std::cout << "name: " << name << ", value: " << str << std::endl; + // auto field_value = to_wide_string(str); + // field->setValue(field_value.data(), field_value.size()); + + char_string_reader->init(str.data(), str.size(), false); + auto stream = analyzer->reusableTokenStream(field->name(), char_string_reader); + field->setValue(stream); + + indexwriter->addDocument(doc); + } + } + + std::cout << "---------------------" << std::endl; + + indexwriter->close(); + + _CLLDELETE(indexwriter); + _CLLDELETE(doc); + _CLLDELETE(analyzer); + _CLLDELETE(char_string_reader); + } + // }); + // } + + // for (int32_t i = 0; i < 3; i++) { + // threads[i].join(); + // } + + std::cout << "---------------------" << std::endl; + + auto indexCompaction = [&datas, &name](std::vector<lucene::store::Directory*> srcDirs, + std::vector<lucene::store::Directory*> destDirs, + int32_t count) { + auto indexwriter = _CLNEW lucene::index::IndexWriter("/mnt/disk2/yangsiyu/clucene/index0", + nullptr, true); + + std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec( + srcDirs.size(), std::vector<std::pair<uint32_t, uint32_t>>(count)); + int32_t idx = 0; + int32_t id = 0; + for (int32_t i = 0; i < count; i++) { + for (int32_t j = 0; j < srcDirs.size(); j++) { + if (id == count * destDirs.size()) { + idx++; + id = 0; + } + trans_vec[j][i] = std::make_pair(idx, id++); + } + } + + std::vector<uint32_t> dest_index_docs(destDirs.size()); + for (int32_t i = 0; i < destDirs.size(); i++) { + dest_index_docs[i] = count * destDirs.size(); + } + + indexwriter->indexCompaction(srcDirs, destDirs, trans_vec, dest_index_docs); + indexwriter->close(); + _CLDELETE(indexwriter); + }; + + // index compaction + { + { + std::vector<lucene::store::Directory*> srcDirs; + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index1")); + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index1")); + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index1")); + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index1")); + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index1")); + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index1")); + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index1")); + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index1")); + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index1")); + std::vector<lucene::store::Directory*> destDirs; + destDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index10")); + destDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index11")); + destDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index12")); + indexCompaction(srcDirs, destDirs, datas[name].size()); + for (auto& p : srcDirs) { + p->close(); + _CLDECDELETE(p); + } + for (auto& p : destDirs) { + p->close(); + _CLDECDELETE(p); + } + } + { + std::vector<lucene::store::Directory*> srcDirs; + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index10")); + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index11")); + srcDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index12")); + std::vector<lucene::store::Directory*> destDirs; + destDirs.push_back(FSDirectory::getDirectory("/mnt/disk2/yangsiyu/clucene/index13")); + indexCompaction(srcDirs, destDirs, datas[name].size() * 3 * 3); + for (auto& p : srcDirs) { + p->close(); + _CLDECDELETE(p); + } + for (auto& p : destDirs) { + p->close(); + _CLDECDELETE(p); + } + } + } + + // search + { + std::vector<int32_t> v; + v.push_back(1); + v.push_back(10); + v.push_back(11); + v.push_back(12); + v.push_back(13); + for (auto idx : v) { + std::cout << "---------------------" << std::endl; + std::string path = "/mnt/disk2/yangsiyu/clucene/index" + std::to_string(idx); + IndexReader* reader = IndexReader::open(path.c_str()); + IndexSearcher index_searcher(reader); + for (int i = 0; i < 1; i++) { + { + TimeGuard time("term query1"); + Term* t = _CLNEW Term(_T("description"), _T("telerik")); + Query* query = _CLNEW TermQuery(t); + _CLLDECDELETE(t); + + std::vector<int32_t> result; + index_searcher._search(query, + [&result](const int32_t docid, const float_t /*score*/) { + result.push_back(docid); + }); + std::cout << "term result: " << result.size() << std::endl; + _CLLDELETE(query); + } + { + TimeGuard time("term query2"); + Term* t = _CLNEW Term(_T("description"), _T("kendo")); + Query* query = _CLNEW TermQuery(t); + _CLLDECDELETE(t); + + std::vector<int32_t> result; + index_searcher._search(query, + [&result](const int32_t docid, const float_t /*score*/) { + result.push_back(docid); + }); + std::cout << "term result: " << result.size() << std::endl; + _CLLDELETE(query); + } + { + TimeGuard time("term query3"); + Term* t = _CLNEW Term(_T("description"), _T("themes")); + Query* query = _CLNEW TermQuery(t); + _CLLDECDELETE(t); + + std::vector<int32_t> result; + index_searcher._search(query, + [&result](const int32_t docid, const float_t /*score*/) { + result.push_back(docid); + }); + std::cout << "term result: " << result.size() << std::endl; + _CLLDELETE(query); + } + { + TimeGuard time("phrase query1"); + Term* term1 = _CLNEW Term(_T( "description" ), _T( "telerik" )); + Term* term2 = _CLNEW Term(_T( "description" ), _T( "kendo" )); + Term* term3 = _CLNEW Term(_T( "description" ), _T( "themes" )); + PhraseQuery* query = _CLNEW PhraseQuery(); + query->add(term1); + query->add(term2); + query->add(term3); + _CLLDECDELETE(term1); + _CLLDECDELETE(term2); + _CLLDECDELETE(term3); + + std::vector<int32_t> result; + index_searcher._search(query, + [&result](const int32_t docid, const float_t /*score*/) { + result.push_back(docid); + }); + std::cout << "phrase result: " << result.size() << std::endl; + _CLLDELETE(query); + } + { + TimeGuard time("phrase query2"); + Term* term1 = _CLNEW Term(_T( "description" ), _T( "telerik" )); + PhraseQuery* query = _CLNEW PhraseQuery(); + query->add(term1); + _CLLDECDELETE(term1); + + std::vector<int32_t> result; + index_searcher._search(query, + [&result](const int32_t docid, const float_t /*score*/) { + result.push_back(docid); + }); + std::cout << "phrase result: " << result.size() << std::endl; + _CLLDELETE(query); + } + } + reader->close(); + _CLDELETE(reader); + } + } + + return 0; +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org