mikemccand commented on code in PR #13585: URL: https://github.com/apache/lucene/pull/13585#discussion_r1684540391
########## lucene/core/src/java/org/apache/lucene/codecs/lucene912/Lucene912PostingsWriter.java: ########## @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.codecs.lucene912; + +import static org.apache.lucene.codecs.lucene912.Lucene912PostingsFormat.BLOCK_SIZE; +import static org.apache.lucene.codecs.lucene912.Lucene912PostingsFormat.DOC_CODEC; +import static org.apache.lucene.codecs.lucene912.Lucene912PostingsFormat.PAY_CODEC; +import static org.apache.lucene.codecs.lucene912.Lucene912PostingsFormat.*; +import static org.apache.lucene.codecs.lucene912.Lucene912PostingsFormat.TERMS_CODEC; +import static org.apache.lucene.codecs.lucene912.Lucene912PostingsFormat.VERSION_CURRENT; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.lucene.codecs.BlockTermState; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.CompetitiveImpactAccumulator; +import org.apache.lucene.codecs.PushPostingsWriterBase; +import org.apache.lucene.codecs.lucene912.Lucene912PostingsFormat.IntBlockTermState; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.Impact; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BitUtil; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; + +public class Lucene912PostingsWriter extends PushPostingsWriterBase { + + static final IntBlockTermState EMPTY_STATE = new IntBlockTermState(); + + IndexOutput docOut; + IndexOutput posOut; + IndexOutput payOut; + + IntBlockTermState lastState; + + // Holds starting file pointers for current term: + private long docStartFP; + private long posStartFP; + private long payStartFP; + + final long[] docDeltaBuffer; + final long[] freqBuffer; + private int docBufferUpto; + + final long[] posDeltaBuffer; + final long[] payloadLengthBuffer; + final long[] offsetStartDeltaBuffer; + final long[] offsetLengthBuffer; + private int posBufferUpto; + + private byte[] payloadBytes; + private int payloadByteUpto; + + private int lastBlockDocID; + private int lastSkipDocID; + private long lastSkipPosFP; + private long lastSkipPayFP; + + private int docID; + private int lastDocID; + private int lastPosition; + private int lastStartOffset; + private int docCount; + + private final PForUtil pforUtil; + private final ForDeltaUtil forDeltaUtil; + + private boolean fieldHasNorms; + private NumericDocValues norms; + private final CompetitiveImpactAccumulator competitiveFreqNormAccumulator = + new CompetitiveImpactAccumulator(); + private final CompetitiveImpactAccumulator skipCompetitiveFreqNormAccumulator = + new CompetitiveImpactAccumulator(); + + private final ByteBuffersDataOutput spareOutput = ByteBuffersDataOutput.newResettableInstance(); + private final ByteBuffersDataOutput blockOutput = ByteBuffersDataOutput.newResettableInstance(); + private final ByteBuffersDataOutput skipOutput = ByteBuffersDataOutput.newResettableInstance(); + + public Lucene912PostingsWriter(SegmentWriteState state) throws IOException { + String docFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, Lucene912PostingsFormat.DOC_EXTENSION); + docOut = state.directory.createOutput(docFileName, state.context); + IndexOutput posOut = null; + IndexOutput payOut = null; + boolean success = false; + try { + CodecUtil.writeIndexHeader( + docOut, DOC_CODEC, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix); + final ForUtil forUtil = new ForUtil(); + forDeltaUtil = new ForDeltaUtil(forUtil); + pforUtil = new PForUtil(forUtil); + if (state.fieldInfos.hasProx()) { + posDeltaBuffer = new long[BLOCK_SIZE]; + String posFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, Lucene912PostingsFormat.POS_EXTENSION); + posOut = state.directory.createOutput(posFileName, state.context); + CodecUtil.writeIndexHeader( + posOut, POS_CODEC, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix); + + if (state.fieldInfos.hasPayloads()) { + payloadBytes = new byte[128]; + payloadLengthBuffer = new long[BLOCK_SIZE]; + } else { + payloadBytes = null; + payloadLengthBuffer = null; + } + + if (state.fieldInfos.hasOffsets()) { + offsetStartDeltaBuffer = new long[BLOCK_SIZE]; + offsetLengthBuffer = new long[BLOCK_SIZE]; + } else { + offsetStartDeltaBuffer = null; + offsetLengthBuffer = null; + } + + if (state.fieldInfos.hasPayloads() || state.fieldInfos.hasOffsets()) { + String payFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + Lucene912PostingsFormat.PAY_EXTENSION); + payOut = state.directory.createOutput(payFileName, state.context); + CodecUtil.writeIndexHeader( + payOut, PAY_CODEC, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix); + } + } else { + posDeltaBuffer = null; + payloadLengthBuffer = null; + offsetStartDeltaBuffer = null; + offsetLengthBuffer = null; + payloadBytes = null; + } + this.payOut = payOut; + this.posOut = posOut; + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(docOut, posOut, payOut); + } + } + + docDeltaBuffer = new long[BLOCK_SIZE]; + freqBuffer = new long[BLOCK_SIZE]; + } + + @Override + public IntBlockTermState newTermState() { + return new IntBlockTermState(); + } + + @Override + public void init(IndexOutput termsOut, SegmentWriteState state) throws IOException { + CodecUtil.writeIndexHeader( + termsOut, TERMS_CODEC, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix); + termsOut.writeVInt(BLOCK_SIZE); + } + + @Override + public void setField(FieldInfo fieldInfo) { + super.setField(fieldInfo); + lastState = EMPTY_STATE; + fieldHasNorms = fieldInfo.hasNorms(); + } + + @Override + public void startTerm(NumericDocValues norms) { + docStartFP = docOut.getFilePointer(); + if (writePositions) { + posStartFP = posOut.getFilePointer(); + lastSkipPosFP = posStartFP; + if (writePayloads || writeOffsets) { + payStartFP = payOut.getFilePointer(); + lastSkipPayFP = payStartFP; + } + } + lastDocID = -1; + lastBlockDocID = -1; + lastSkipDocID = -1; + this.norms = norms; + competitiveFreqNormAccumulator.clear(); + } + + @Override + public void startDoc(int docID, int termDocFreq) throws IOException { + if (docBufferUpto == BLOCK_SIZE) { + flushDocBlock(false); + docBufferUpto = 0; + } + + final int docDelta = docID - lastDocID; + + if (docID < 0 || docDelta <= 0) { + throw new CorruptIndexException( + "docs out of order (" + docID + " <= " + lastDocID + " )", docOut); + } + + docDeltaBuffer[docBufferUpto] = docDelta; + if (writeFreqs) { + freqBuffer[docBufferUpto] = termDocFreq; + } + + this.docID = docID; + lastPosition = 0; + lastStartOffset = 0; + + long norm; + if (fieldHasNorms) { + boolean found = norms.advanceExact(docID); + if (found == false) { + // This can happen if indexing hits a problem after adding a doc to the + // postings but before buffering the norm. Such documents are written + // deleted and will go away on the first merge. + norm = 1L; + } else { + norm = norms.longValue(); + assert norm != 0 : docID; + } + } else { + norm = 1L; + } + + competitiveFreqNormAccumulator.add(writeFreqs ? termDocFreq : 1, norm); + } + + @Override + public void addPosition(int position, BytesRef payload, int startOffset, int endOffset) + throws IOException { + if (position > IndexWriter.MAX_POSITION) { + throw new CorruptIndexException( + "position=" + + position + + " is too large (> IndexWriter.MAX_POSITION=" + + IndexWriter.MAX_POSITION + + ")", + docOut); + } + if (position < 0) { + throw new CorruptIndexException("position=" + position + " is < 0", docOut); + } + posDeltaBuffer[posBufferUpto] = position - lastPosition; + if (writePayloads) { + if (payload == null || payload.length == 0) { + // no payload + payloadLengthBuffer[posBufferUpto] = 0; + } else { + payloadLengthBuffer[posBufferUpto] = payload.length; + if (payloadByteUpto + payload.length > payloadBytes.length) { + payloadBytes = ArrayUtil.grow(payloadBytes, payloadByteUpto + payload.length); + } + System.arraycopy( + payload.bytes, payload.offset, payloadBytes, payloadByteUpto, payload.length); + payloadByteUpto += payload.length; + } + } + + if (writeOffsets) { + assert startOffset >= lastStartOffset; + assert endOffset >= startOffset; + offsetStartDeltaBuffer[posBufferUpto] = startOffset - lastStartOffset; + offsetLengthBuffer[posBufferUpto] = endOffset - startOffset; + lastStartOffset = startOffset; + } + + posBufferUpto++; + lastPosition = position; + if (posBufferUpto == BLOCK_SIZE) { + pforUtil.encode(posDeltaBuffer, posOut); + + if (writePayloads) { + pforUtil.encode(payloadLengthBuffer, payOut); + payOut.writeVInt(payloadByteUpto); + payOut.writeBytes(payloadBytes, 0, payloadByteUpto); + payloadByteUpto = 0; + } + if (writeOffsets) { + pforUtil.encode(offsetStartDeltaBuffer, payOut); + pforUtil.encode(offsetLengthBuffer, payOut); + } + posBufferUpto = 0; + } + } + + @Override + public void finishDoc() throws IOException { + docBufferUpto++; + docCount++; + + lastDocID = docID; + } + + private void flushDocBlock(boolean end) throws IOException { + assert docBufferUpto != 0; + + if (docBufferUpto < BLOCK_SIZE) { + PostingsUtil.writeVIntBlock(blockOutput, docDeltaBuffer, freqBuffer, docBufferUpto, writeFreqs); + } else { + writeImpacts(competitiveFreqNormAccumulator.getCompetitiveFreqNormPairs(), spareOutput); + assert blockOutput.size() == 0; + blockOutput.writeVLong(spareOutput.size()); + spareOutput.copyTo(blockOutput); + spareOutput.reset(); + if (writePositions) { + long blockTTF = Arrays.stream(freqBuffer).sum(); + blockOutput.writeVLong(blockTTF); + } + long numSkipBytes = blockOutput.size(); + forDeltaUtil.encodeDeltas(docDeltaBuffer, blockOutput); + if (writeFreqs) { + pforUtil.encode(freqBuffer, blockOutput); + } + + spareOutput.writeVInt(docID - lastBlockDocID); + spareOutput.writeVLong(blockOutput.size()); + numSkipBytes += spareOutput.size(); + skipOutput.writeVLong(numSkipBytes); + spareOutput.copyTo(skipOutput); + spareOutput.reset(); + } + + blockOutput.copyTo(skipOutput); + blockOutput.reset(); + lastBlockDocID = docID; + skipCompetitiveFreqNormAccumulator.addAll(competitiveFreqNormAccumulator); + competitiveFreqNormAccumulator.clear(); + + if ((docCount & SKIP_MASK) == 0) { + docOut.writeVInt(docID - lastSkipDocID); + writeImpacts(skipCompetitiveFreqNormAccumulator.getCompetitiveFreqNormPairs(), spareOutput); + docOut.writeVLong(spareOutput.size()); + docOut.writeVLong(skipOutput.size()); + if (writePositions) { + docOut.writeVLong(posOut.getFilePointer() - lastSkipPosFP); + docOut.writeVInt(posBufferUpto); + lastSkipPosFP = posOut.getFilePointer(); + + if (writeOffsets || writePayloads) { + docOut.writeVLong(payOut.getFilePointer() - lastSkipPayFP); + docOut.writeVInt(payloadByteUpto); + lastSkipPayFP = payOut.getFilePointer(); + } + } + spareOutput.copyTo(docOut); + spareOutput.reset(); + skipOutput.copyTo(docOut); + skipOutput.reset(); + lastSkipDocID = docID; + skipCompetitiveFreqNormAccumulator.clear(); + } else if (end) { Review Comment: OK I think I see why: when `end` is true, we still have buffered a bunch of blocks (< `SKIP_FACTOR` blocks) which will be written after this pending skip data is written. So this is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org