mikemccand commented on code in PR #15979: URL: https://github.com/apache/lucene/pull/15979#discussion_r3389913556
########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java: ########## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.internal.hppc.IntObjectHashMap; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.DataAccessHint; +import org.apache.lucene.store.FileDataHint; +import org.apache.lucene.store.FileTypeHint; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IOContext.FileOpenHint; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.hnsw.RandomVectorScorer; +import org.apache.lucene.util.packed.DirectReader; + +/** + * Reads a {@link DedupFlatVectorsFormat} segment. + * + * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) followed by per-field DISI + * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field metadata. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsReader extends FlatVectorsReader { + + private static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class); + + private final FlatVectorsScorer vectorScorer; + private final FlatVectorsScorer translatingScorer; + private final FieldInfos fieldInfos; + private final IndexInput vectorData; + private final IOContext dataContext; + + private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>(); + + public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer) + throws IOException { + this(state, scorer, DataAccessHint.RANDOM); + } + + public DedupFlatVectorsReader( + SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint accessHint) + throws IOException { + this.vectorScorer = scorer; + this.translatingScorer = new DedupTranslatingScorer(scorer); + this.fieldInfos = state.fieldInfos; + + int versionMeta; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + try (ChecksumIndexInput meta = state.directory.openChecksumInput(metaFileName)) { + Throwable priorE = null; + try { + versionMeta = + CodecUtil.checkIndexHeader( + meta, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_START, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + readMetaBody(meta, state.fieldInfos); + } catch (Throwable e) { + priorE = e; + throw e; + } finally { + CodecUtil.checkFooter(meta, priorE); + } + } + + FileOpenHint[] hints = + Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint) + .filter(Objects::nonNull) Review Comment: Is this because `accessHint` might be null? Can we just `null` check it up front, and throw `NPE` -- or are callers really allowed to pass `null` in general? ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsFormat; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene90.IndexedDISI; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.IndexOutput; + +/** + * Flat vector format that stores each unique vector exactly once per segment. Multiple documents + * (within the same field, and across fields with matching {@code (dimension, encoding)}) may share + * the same physical vector, dramatically reducing on-disk size when many docs/fields point at the + * same vector data. + * + * <p>The format groups fields into "pools" keyed by {@code (dimension, encoding)}. All fields in + * the same pool share unique-vector storage; per-field metadata records how each doc-ord maps to a Review Comment: Hmm, is encoding the `String` SPI identifier, or, a specific instance of one? ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsFormat; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene90.IndexedDISI; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.IndexOutput; + +/** + * Flat vector format that stores each unique vector exactly once per segment. Multiple documents + * (within the same field, and across fields with matching {@code (dimension, encoding)}) may share + * the same physical vector, dramatically reducing on-disk size when many docs/fields point at the + * same vector data. + * + * <p>The format groups fields into "pools" keyed by {@code (dimension, encoding)}. All fields in + * the same pool share unique-vector storage; per-field metadata records how each doc-ord maps to a + * vector-ord within the pool. + * Review Comment: Maybe add paragraph saying how dedup is done? Does it store every vector in heap during indexing before segment is flushed? Or, Lucene already does that today, and we are building hash map over those (not double-heap-storing)? Maybe say rough RAM requirements -- is it 64 bit hash, etc. Also explain how merging works -- then we are not caching in RAM, but to merge the pools, is it iterating through hash map? Are vectors sorted, so it's a merge sort at merge time? Good to explain high level approach in these format docs. ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java: ########## @@ -0,0 +1,1024 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import static org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.KnnVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocsWithFieldSet; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.DirectWriter; + +/** + * Writes a {@link DedupFlatVectorsFormat} segment. + * + * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, with a per-pool dedup + * hash table ({@link FlushPool}). {@code flush()} writes contiguous unique-vector bytes per pool, + * then per-field {@code docOrd → vecOrd} maps. + * + * <p><b>Merge</b>: per-field calls iterate source segments via {@link DocIDMerger}. A per-pool + * {@link MergePool} hash table maps the 64-bit hash of each candidate vector to a {@code + * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be byte-verified by re-reading the + * source via mmap. No temp files are written; pool data is materialised into {@code .dvc} at {@link + * #finish()} time by re-reading each unique vector from its source segment (sources stay open until + * merge ends). + * + * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the merge takes a fast path + * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, sourceReader) pair and + * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc loop becomes two int Review Comment: Let's add good testing for this? Where merging deletes nearly every doc? And, sometimes, it deletes every doc that had a vector (yet some non-vector docs are merged). ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java: ########## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.internal.hppc.IntObjectHashMap; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.DataAccessHint; +import org.apache.lucene.store.FileDataHint; +import org.apache.lucene.store.FileTypeHint; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IOContext.FileOpenHint; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.hnsw.RandomVectorScorer; +import org.apache.lucene.util.packed.DirectReader; + +/** + * Reads a {@link DedupFlatVectorsFormat} segment. + * + * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) followed by per-field DISI + * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field metadata. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsReader extends FlatVectorsReader { + + private static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class); + + private final FlatVectorsScorer vectorScorer; + private final FlatVectorsScorer translatingScorer; + private final FieldInfos fieldInfos; + private final IndexInput vectorData; + private final IOContext dataContext; + + private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>(); + + public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer) + throws IOException { + this(state, scorer, DataAccessHint.RANDOM); + } + + public DedupFlatVectorsReader( + SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint accessHint) + throws IOException { + this.vectorScorer = scorer; + this.translatingScorer = new DedupTranslatingScorer(scorer); + this.fieldInfos = state.fieldInfos; + + int versionMeta; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + try (ChecksumIndexInput meta = state.directory.openChecksumInput(metaFileName)) { + Throwable priorE = null; + try { + versionMeta = + CodecUtil.checkIndexHeader( + meta, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_START, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + readMetaBody(meta, state.fieldInfos); + } catch (Throwable e) { + priorE = e; + throw e; + } finally { + CodecUtil.checkFooter(meta, priorE); + } + } + + FileOpenHint[] hints = + Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint) + .filter(Objects::nonNull) + .toArray(FileOpenHint[]::new); + dataContext = state.context.withHints(hints); + boolean success = false; + IndexInput data = null; + try { + data = openDataInput(state, versionMeta, dataContext); + this.vectorData = data; + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(data); + } + } + } + + private void readMetaBody(ChecksumIndexInput meta, FieldInfos fieldInfos) throws IOException { + int numPools = meta.readVInt(); + PoolEntry[] poolsLocal = new PoolEntry[numPools]; + for (int p = 0; p < numPools; p++) { + int dim = meta.readVInt(); + int encOrd = meta.readByte(); + VectorEncoding encoding = VectorEncoding.values()[encOrd]; + long offset = meta.readVLong(); + long length = meta.readVLong(); + int uniqueCount = meta.readVInt(); + poolsLocal[p] = new PoolEntry(p, dim, encoding, offset, length, uniqueCount); + } + for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) { + FieldInfo info = fieldInfos.fieldInfo(fieldNumber); + if (info == null) { + throw new CorruptIndexException("Invalid field number: " + fieldNumber, meta); + } + int simOrd = meta.readByte(); + VectorSimilarityFunction sim = VectorSimilarityFunction.values()[simOrd]; + int poolId = meta.readVInt(); + if (poolId < 0 || poolId >= poolsLocal.length) { + throw new CorruptIndexException( + "Invalid poolId " + poolId + " (numPools=" + poolsLocal.length + ")", meta); + } + int cardinality = meta.readInt(); + OrdToDocDISIReaderConfiguration ordToDoc = + OrdToDocDISIReaderConfiguration.fromStoredMeta(meta, cardinality); + long mapOffset = meta.readVLong(); + long mapLength = meta.readVLong(); + int bitsPerVecOrd = meta.readByte(); + FieldEntry entry = + new FieldEntry( + info, + sim, + poolId, + cardinality, + mapOffset, + mapLength, + bitsPerVecOrd, + ordToDoc, + poolsLocal[poolId]); + fields.put(info.number, entry); + } + } + + private static IndexInput openDataInput( + SegmentReadState state, int versionMeta, IOContext context) throws IOException { + String fileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION); + IndexInput in = state.directory.openInput(fileName, context); + boolean success = false; + try { + int versionVectorData = + CodecUtil.checkIndexHeader( + in, + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_START, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + if (versionMeta != versionVectorData) { + throw new CorruptIndexException( + "Format versions mismatch: meta=" + + versionMeta + + ", " + + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME + + "=" + + versionVectorData, + in); + } + CodecUtil.retrieveChecksum(in); + success = true; + return in; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(in); + } + } + } + + private FieldEntry getFieldEntry(String fieldName, VectorEncoding expected) { + FieldInfo info = fieldInfos.fieldInfo(fieldName); + if (info == null) { + throw new IllegalArgumentException("field=\"" + fieldName + "\" not found"); + } + FieldEntry entry = fields.get(info.number); + if (entry == null) { + throw new IllegalArgumentException( + "field=\"" + fieldName + "\" not encoded by DedupFlatVectorsFormat"); + } + if (entry.pool.encoding != expected) { + throw new IllegalArgumentException( + "field=\"" + + fieldName + + "\" encoded as " + + entry.pool.encoding + + " but expected " + + expected); + } + return entry; + } + + @Override + public FloatVectorValues getFloatVectorValues(String field) throws IOException { + FieldEntry entry = getFieldEntry(field, VectorEncoding.FLOAT32); + return OffHeapDedupFloatVectorValues.create(entry, vectorData, vectorScorer); + } + + @Override + public ByteVectorValues getByteVectorValues(String field) throws IOException { + FieldEntry entry = getFieldEntry(field, VectorEncoding.BYTE); + return OffHeapDedupByteVectorValues.create(entry, vectorData, vectorScorer); + } + + @Override + public FlatVectorsScorer getFlatVectorScorer(String field) throws IOException { + // Used by Lucene99HnswVectorsWriter at merge time: it asks for a scorer over our flat + // values, then builds a RandomVectorScorerSupplier from it. The translating wrapper + // detects our OffHeapDedup* values, extracts the pool view, builds an off-heap SIMD + // supplier over the pool, and translates docOrd → vecOrd on every scoring call. + return translatingScorer; + } + + @Override + public RandomVectorScorer getRandomVectorScorer(String field, float[] target) throws IOException { + FieldEntry entry = getFieldEntry(field, VectorEncoding.FLOAT32); + if (entry.cardinality == 0) { + return null; + } + OffHeapDedupFloatVectorValues fieldView = + OffHeapDedupFloatVectorValues.create(entry, vectorData, vectorScorer); + KnnVectorValues poolView = fieldView.poolView(); + RandomVectorScorer poolScorer = + vectorScorer.getRandomVectorScorer(entry.similarity, poolView, target); + int[] docOrdToVecOrd = entry.getOrLoadDocOrdToVecOrd(vectorData); + return DedupTranslatingScorer.wrapScorer(poolScorer, fieldView, docOrdToVecOrd); + } + + @Override + public RandomVectorScorer getRandomVectorScorer(String field, byte[] target) throws IOException { + FieldEntry entry = getFieldEntry(field, VectorEncoding.BYTE); + if (entry.cardinality == 0) { + return null; + } + OffHeapDedupByteVectorValues fieldView = + OffHeapDedupByteVectorValues.create(entry, vectorData, vectorScorer); + KnnVectorValues poolView = fieldView.poolView(); + RandomVectorScorer poolScorer = + vectorScorer.getRandomVectorScorer(entry.similarity, poolView, target); + int[] docOrdToVecOrd = entry.getOrLoadDocOrdToVecOrd(vectorData); + return DedupTranslatingScorer.wrapScorer(poolScorer, fieldView, docOrdToVecOrd); + } + + @Override + public Map<String, Long> getOffHeapByteSize(FieldInfo fieldInfo) { + FieldEntry entry = fields.get(fieldInfo.number); + if (entry == null) { + return Map.of(); + } + return Map.of( + DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION, entry.pool.length + entry.mapLength); + } + + @Override + public void checkIntegrity() throws IOException { + CodecUtil.checksumEntireFile(vectorData); + } + + @Override + public FlatVectorsReader getMergeInstance() throws IOException { + vectorData.updateIOContext(dataContext.withHints(DataAccessHint.SEQUENTIAL)); + return this; + } + + @Override + public void finishMerge() throws IOException { + vectorData.updateIOContext(dataContext); + } + + @Override + public void close() throws IOException { + IOUtils.close(vectorData); + } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + fields.ramBytesUsed(); + } + + /** Test-only: look up a field entry by name. Package-private. */ + FieldEntry getFieldEntryForTesting(String fieldName) { + FieldInfo info = fieldInfos.fieldInfo(fieldName); + if (info == null) { + throw new IllegalArgumentException("no such field " + fieldName); + } + return fields.get(info.number); + } + + /** Per-pool runtime state. */ + static final class PoolEntry { + final int poolId; + final int dim; + final VectorEncoding encoding; + final long offset; + final long length; + final int uniqueCount; + final int byteSize; + + PoolEntry( + int poolId, int dim, VectorEncoding encoding, long offset, long length, int uniqueCount) { + this.poolId = poolId; + this.dim = dim; + this.encoding = encoding; + this.offset = offset; + this.length = length; + this.uniqueCount = uniqueCount; + this.byteSize = dim * encoding.byteSize; + } + } + + /** Per-field runtime state. */ + static final class FieldEntry { + final FieldInfo info; + final VectorSimilarityFunction similarity; + final int poolId; + final int cardinality; + final long mapOffset; + final long mapLength; + final int bitsPerVecOrd; + final OrdToDocDISIReaderConfiguration ordToDoc; + final PoolEntry pool; + + /** + * Lazily-materialised dense {@code docOrd → vecOrd} array. Loaded on first scorer build so the + * hot HNSW build/search loop translates with a single primitive int[] read instead of + * dispatching through {@link DirectReader}'s {@link LongValues}. Memory footprint: 4 × + * cardinality bytes per field (e.g. ~4 MB for 1M docs). + */ + private volatile int[] docOrdToVecOrdArray; + + FieldEntry( + FieldInfo info, + VectorSimilarityFunction similarity, + int poolId, + int cardinality, + long mapOffset, + long mapLength, + int bitsPerVecOrd, + OrdToDocDISIReaderConfiguration ordToDoc, + PoolEntry pool) { + this.info = info; + this.similarity = similarity; + this.poolId = poolId; + this.cardinality = cardinality; + this.mapOffset = mapOffset; + this.mapLength = mapLength; + this.bitsPerVecOrd = bitsPerVecOrd; + this.ordToDoc = ordToDoc; + this.pool = pool; + } + + int[] getOrLoadDocOrdToVecOrd(IndexInput vectorData) throws IOException { + int[] cached = docOrdToVecOrdArray; + if (cached != null) { + return cached; + } + synchronized (this) { + cached = docOrdToVecOrdArray; + if (cached != null) { + return cached; + } + cached = loadDocOrdToVecOrd(vectorData); + docOrdToVecOrdArray = cached; + return cached; + } + } + + private int[] loadDocOrdToVecOrd(IndexInput vectorData) throws IOException { + int[] arr = new int[cardinality]; Review Comment: Oh, `cardinality` is maybe actually `maxDoc`? Or is it number of docs that have a vector in this segment? `cardinality` is tricky. It's a synonym for "number", or "count", ... ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsFormat; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene90.IndexedDISI; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.IndexOutput; + +/** + * Flat vector format that stores each unique vector exactly once per segment. Multiple documents + * (within the same field, and across fields with matching {@code (dimension, encoding)}) may share + * the same physical vector, dramatically reducing on-disk size when many docs/fields point at the + * same vector data. + * + * <p>The format groups fields into "pools" keyed by {@code (dimension, encoding)}. All fields in + * the same pool share unique-vector storage; per-field metadata records how each doc-ord maps to a Review Comment: Oh! This is `VectorEncoding` -- `byte[]` vs `float[]` input vectors, right? Cool, so this works for both! ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java: ########## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.internal.hppc.IntObjectHashMap; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.DataAccessHint; +import org.apache.lucene.store.FileDataHint; +import org.apache.lucene.store.FileTypeHint; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IOContext.FileOpenHint; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.hnsw.RandomVectorScorer; +import org.apache.lucene.util.packed.DirectReader; + +/** + * Reads a {@link DedupFlatVectorsFormat} segment. + * + * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) followed by per-field DISI + * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field metadata. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsReader extends FlatVectorsReader { + + private static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class); + + private final FlatVectorsScorer vectorScorer; + private final FlatVectorsScorer translatingScorer; + private final FieldInfos fieldInfos; + private final IndexInput vectorData; + private final IOContext dataContext; + + private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>(); + + public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer) + throws IOException { + this(state, scorer, DataAccessHint.RANDOM); Review Comment: Does this format implement prefetch? Maybe add `// TODO` and/or open followon once we merge? Since this format could "amplify" the randomness (the deref to original ordinal), prefetch is maybe even more important than the non-dup-detecting formats. ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java: ########## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.internal.hppc.IntObjectHashMap; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.DataAccessHint; +import org.apache.lucene.store.FileDataHint; +import org.apache.lucene.store.FileTypeHint; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IOContext.FileOpenHint; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.hnsw.RandomVectorScorer; +import org.apache.lucene.util.packed.DirectReader; + +/** + * Reads a {@link DedupFlatVectorsFormat} segment. + * + * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) followed by per-field DISI + * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field metadata. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsReader extends FlatVectorsReader { + + private static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class); + + private final FlatVectorsScorer vectorScorer; + private final FlatVectorsScorer translatingScorer; + private final FieldInfos fieldInfos; + private final IndexInput vectorData; + private final IOContext dataContext; + + private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>(); + + public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer) + throws IOException { + this(state, scorer, DataAccessHint.RANDOM); + } + + public DedupFlatVectorsReader( + SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint accessHint) + throws IOException { + this.vectorScorer = scorer; + this.translatingScorer = new DedupTranslatingScorer(scorer); + this.fieldInfos = state.fieldInfos; + + int versionMeta; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + try (ChecksumIndexInput meta = state.directory.openChecksumInput(metaFileName)) { + Throwable priorE = null; + try { + versionMeta = + CodecUtil.checkIndexHeader( + meta, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_START, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + readMetaBody(meta, state.fieldInfos); + } catch (Throwable e) { + priorE = e; + throw e; + } finally { + CodecUtil.checkFooter(meta, priorE); + } + } + + FileOpenHint[] hints = + Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint) + .filter(Objects::nonNull) + .toArray(FileOpenHint[]::new); + dataContext = state.context.withHints(hints); + boolean success = false; + IndexInput data = null; + try { + data = openDataInput(state, versionMeta, dataContext); + this.vectorData = data; + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(data); + } + } + } + + private void readMetaBody(ChecksumIndexInput meta, FieldInfos fieldInfos) throws IOException { + int numPools = meta.readVInt(); + PoolEntry[] poolsLocal = new PoolEntry[numPools]; + for (int p = 0; p < numPools; p++) { + int dim = meta.readVInt(); + int encOrd = meta.readByte(); + VectorEncoding encoding = VectorEncoding.values()[encOrd]; + long offset = meta.readVLong(); + long length = meta.readVLong(); + int uniqueCount = meta.readVInt(); + poolsLocal[p] = new PoolEntry(p, dim, encoding, offset, length, uniqueCount); + } + for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) { + FieldInfo info = fieldInfos.fieldInfo(fieldNumber); + if (info == null) { + throw new CorruptIndexException("Invalid field number: " + fieldNumber, meta); + } + int simOrd = meta.readByte(); + VectorSimilarityFunction sim = VectorSimilarityFunction.values()[simOrd]; + int poolId = meta.readVInt(); + if (poolId < 0 || poolId >= poolsLocal.length) { + throw new CorruptIndexException( + "Invalid poolId " + poolId + " (numPools=" + poolsLocal.length + ")", meta); + } + int cardinality = meta.readInt(); Review Comment: Is this number of unique vectors? Maybe rename to `numVectors`? ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java: ########## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.internal.hppc.IntObjectHashMap; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.DataAccessHint; +import org.apache.lucene.store.FileDataHint; +import org.apache.lucene.store.FileTypeHint; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IOContext.FileOpenHint; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.hnsw.RandomVectorScorer; +import org.apache.lucene.util.packed.DirectReader; + +/** + * Reads a {@link DedupFlatVectorsFormat} segment. + * + * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) followed by per-field DISI + * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field metadata. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsReader extends FlatVectorsReader { + + private static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class); + + private final FlatVectorsScorer vectorScorer; + private final FlatVectorsScorer translatingScorer; + private final FieldInfos fieldInfos; + private final IndexInput vectorData; + private final IOContext dataContext; + + private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>(); + + public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer) + throws IOException { + this(state, scorer, DataAccessHint.RANDOM); + } + + public DedupFlatVectorsReader( + SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint accessHint) + throws IOException { + this.vectorScorer = scorer; + this.translatingScorer = new DedupTranslatingScorer(scorer); + this.fieldInfos = state.fieldInfos; + + int versionMeta; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + try (ChecksumIndexInput meta = state.directory.openChecksumInput(metaFileName)) { + Throwable priorE = null; + try { + versionMeta = + CodecUtil.checkIndexHeader( + meta, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_START, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + readMetaBody(meta, state.fieldInfos); + } catch (Throwable e) { + priorE = e; + throw e; + } finally { + CodecUtil.checkFooter(meta, priorE); + } + } + + FileOpenHint[] hints = + Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint) + .filter(Objects::nonNull) + .toArray(FileOpenHint[]::new); + dataContext = state.context.withHints(hints); + boolean success = false; + IndexInput data = null; + try { + data = openDataInput(state, versionMeta, dataContext); + this.vectorData = data; + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(data); + } + } + } + + private void readMetaBody(ChecksumIndexInput meta, FieldInfos fieldInfos) throws IOException { + int numPools = meta.readVInt(); + PoolEntry[] poolsLocal = new PoolEntry[numPools]; + for (int p = 0; p < numPools; p++) { + int dim = meta.readVInt(); + int encOrd = meta.readByte(); + VectorEncoding encoding = VectorEncoding.values()[encOrd]; + long offset = meta.readVLong(); + long length = meta.readVLong(); + int uniqueCount = meta.readVInt(); + poolsLocal[p] = new PoolEntry(p, dim, encoding, offset, length, uniqueCount); + } + for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) { + FieldInfo info = fieldInfos.fieldInfo(fieldNumber); + if (info == null) { + throw new CorruptIndexException("Invalid field number: " + fieldNumber, meta); + } + int simOrd = meta.readByte(); + VectorSimilarityFunction sim = VectorSimilarityFunction.values()[simOrd]; + int poolId = meta.readVInt(); + if (poolId < 0 || poolId >= poolsLocal.length) { + throw new CorruptIndexException( + "Invalid poolId " + poolId + " (numPools=" + poolsLocal.length + ")", meta); + } + int cardinality = meta.readInt(); + OrdToDocDISIReaderConfiguration ordToDoc = + OrdToDocDISIReaderConfiguration.fromStoredMeta(meta, cardinality); + long mapOffset = meta.readVLong(); + long mapLength = meta.readVLong(); + int bitsPerVecOrd = meta.readByte(); + FieldEntry entry = + new FieldEntry( + info, + sim, + poolId, + cardinality, + mapOffset, + mapLength, + bitsPerVecOrd, + ordToDoc, + poolsLocal[poolId]); + fields.put(info.number, entry); + } + } + + private static IndexInput openDataInput( + SegmentReadState state, int versionMeta, IOContext context) throws IOException { + String fileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION); + IndexInput in = state.directory.openInput(fileName, context); + boolean success = false; + try { + int versionVectorData = + CodecUtil.checkIndexHeader( + in, + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_START, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + if (versionMeta != versionVectorData) { + throw new CorruptIndexException( + "Format versions mismatch: meta=" + + versionMeta + + ", " + + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME + + "=" + + versionVectorData, + in); + } + CodecUtil.retrieveChecksum(in); + success = true; + return in; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(in); + } + } + } + + private FieldEntry getFieldEntry(String fieldName, VectorEncoding expected) { + FieldInfo info = fieldInfos.fieldInfo(fieldName); + if (info == null) { + throw new IllegalArgumentException("field=\"" + fieldName + "\" not found"); + } + FieldEntry entry = fields.get(info.number); + if (entry == null) { + throw new IllegalArgumentException( + "field=\"" + fieldName + "\" not encoded by DedupFlatVectorsFormat"); + } + if (entry.pool.encoding != expected) { + throw new IllegalArgumentException( + "field=\"" + + fieldName + + "\" encoded as " Review Comment: `indexed from byte/float vector`? ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java: ########## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.internal.hppc.IntObjectHashMap; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.DataAccessHint; +import org.apache.lucene.store.FileDataHint; +import org.apache.lucene.store.FileTypeHint; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IOContext.FileOpenHint; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.hnsw.RandomVectorScorer; +import org.apache.lucene.util.packed.DirectReader; + +/** + * Reads a {@link DedupFlatVectorsFormat} segment. + * + * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) followed by per-field DISI + * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field metadata. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsReader extends FlatVectorsReader { + + private static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class); + + private final FlatVectorsScorer vectorScorer; + private final FlatVectorsScorer translatingScorer; + private final FieldInfos fieldInfos; + private final IndexInput vectorData; + private final IOContext dataContext; + + private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>(); + + public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer) + throws IOException { + this(state, scorer, DataAccessHint.RANDOM); + } + + public DedupFlatVectorsReader( + SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint accessHint) + throws IOException { + this.vectorScorer = scorer; + this.translatingScorer = new DedupTranslatingScorer(scorer); + this.fieldInfos = state.fieldInfos; + + int versionMeta; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + try (ChecksumIndexInput meta = state.directory.openChecksumInput(metaFileName)) { + Throwable priorE = null; + try { + versionMeta = + CodecUtil.checkIndexHeader( + meta, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_START, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + readMetaBody(meta, state.fieldInfos); + } catch (Throwable e) { + priorE = e; + throw e; + } finally { + CodecUtil.checkFooter(meta, priorE); + } + } + + FileOpenHint[] hints = + Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint) + .filter(Objects::nonNull) + .toArray(FileOpenHint[]::new); + dataContext = state.context.withHints(hints); + boolean success = false; + IndexInput data = null; + try { + data = openDataInput(state, versionMeta, dataContext); + this.vectorData = data; + success = true; + } finally { + if (!success) { Review Comment: `== false` instead? Reduces chance of future refactoring bugs by humans at least ... ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java: ########## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.internal.hppc.IntObjectHashMap; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.DataAccessHint; +import org.apache.lucene.store.FileDataHint; +import org.apache.lucene.store.FileTypeHint; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IOContext.FileOpenHint; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.hnsw.RandomVectorScorer; +import org.apache.lucene.util.packed.DirectReader; + +/** + * Reads a {@link DedupFlatVectorsFormat} segment. + * + * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) followed by per-field DISI + * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field metadata. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsReader extends FlatVectorsReader { + + private static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class); + + private final FlatVectorsScorer vectorScorer; + private final FlatVectorsScorer translatingScorer; + private final FieldInfos fieldInfos; + private final IndexInput vectorData; + private final IOContext dataContext; + + private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>(); + + public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer) + throws IOException { + this(state, scorer, DataAccessHint.RANDOM); + } + + public DedupFlatVectorsReader( + SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint accessHint) + throws IOException { + this.vectorScorer = scorer; + this.translatingScorer = new DedupTranslatingScorer(scorer); + this.fieldInfos = state.fieldInfos; + + int versionMeta; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + try (ChecksumIndexInput meta = state.directory.openChecksumInput(metaFileName)) { + Throwable priorE = null; + try { + versionMeta = + CodecUtil.checkIndexHeader( + meta, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_START, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + readMetaBody(meta, state.fieldInfos); + } catch (Throwable e) { + priorE = e; + throw e; + } finally { + CodecUtil.checkFooter(meta, priorE); + } + } + + FileOpenHint[] hints = + Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint) + .filter(Objects::nonNull) + .toArray(FileOpenHint[]::new); + dataContext = state.context.withHints(hints); + boolean success = false; + IndexInput data = null; + try { + data = openDataInput(state, versionMeta, dataContext); + this.vectorData = data; + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(data); + } + } + } + + private void readMetaBody(ChecksumIndexInput meta, FieldInfos fieldInfos) throws IOException { + int numPools = meta.readVInt(); + PoolEntry[] poolsLocal = new PoolEntry[numPools]; + for (int p = 0; p < numPools; p++) { + int dim = meta.readVInt(); + int encOrd = meta.readByte(); + VectorEncoding encoding = VectorEncoding.values()[encOrd]; + long offset = meta.readVLong(); + long length = meta.readVLong(); + int uniqueCount = meta.readVInt(); + poolsLocal[p] = new PoolEntry(p, dim, encoding, offset, length, uniqueCount); + } + for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) { + FieldInfo info = fieldInfos.fieldInfo(fieldNumber); + if (info == null) { + throw new CorruptIndexException("Invalid field number: " + fieldNumber, meta); + } + int simOrd = meta.readByte(); + VectorSimilarityFunction sim = VectorSimilarityFunction.values()[simOrd]; + int poolId = meta.readVInt(); + if (poolId < 0 || poolId >= poolsLocal.length) { + throw new CorruptIndexException( + "Invalid poolId " + poolId + " (numPools=" + poolsLocal.length + ")", meta); + } + int cardinality = meta.readInt(); + OrdToDocDISIReaderConfiguration ordToDoc = + OrdToDocDISIReaderConfiguration.fromStoredMeta(meta, cardinality); + long mapOffset = meta.readVLong(); + long mapLength = meta.readVLong(); + int bitsPerVecOrd = meta.readByte(); + FieldEntry entry = + new FieldEntry( + info, + sim, + poolId, + cardinality, + mapOffset, + mapLength, + bitsPerVecOrd, + ordToDoc, + poolsLocal[poolId]); + fields.put(info.number, entry); + } + } + + private static IndexInput openDataInput( + SegmentReadState state, int versionMeta, IOContext context) throws IOException { + String fileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION); + IndexInput in = state.directory.openInput(fileName, context); + boolean success = false; + try { + int versionVectorData = + CodecUtil.checkIndexHeader( + in, + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_START, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + if (versionMeta != versionVectorData) { + throw new CorruptIndexException( + "Format versions mismatch: meta=" + + versionMeta + + ", " + + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME + + "=" + + versionVectorData, + in); + } + CodecUtil.retrieveChecksum(in); + success = true; + return in; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(in); + } + } + } + + private FieldEntry getFieldEntry(String fieldName, VectorEncoding expected) { + FieldInfo info = fieldInfos.fieldInfo(fieldName); + if (info == null) { + throw new IllegalArgumentException("field=\"" + fieldName + "\" not found"); + } + FieldEntry entry = fields.get(info.number); + if (entry == null) { + throw new IllegalArgumentException( + "field=\"" + fieldName + "\" not encoded by DedupFlatVectorsFormat"); Review Comment: Maybe `was not indexed by DedupFlatVectorsFormat in segment X`? ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java: ########## @@ -0,0 +1,1024 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import static org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.KnnVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocsWithFieldSet; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.DirectWriter; + +/** + * Writes a {@link DedupFlatVectorsFormat} segment. + * + * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, with a per-pool dedup + * hash table ({@link FlushPool}). {@code flush()} writes contiguous unique-vector bytes per pool, + * then per-field {@code docOrd → vecOrd} maps. + * + * <p><b>Merge</b>: per-field calls iterate source segments via {@link DocIDMerger}. A per-pool + * {@link MergePool} hash table maps the 64-bit hash of each candidate vector to a {@code + * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be byte-verified by re-reading the + * source via mmap. No temp files are written; pool data is materialised into {@code .dvc} at {@link + * #finish()} time by re-reading each unique vector from its source segment (sources stay open until + * merge ends). + * + * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the merge takes a fast path + * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, sourceReader) pair and + * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc loop becomes two int + * reads with no hash + no byte-verify. Lazy materialisation ensures source uniques whose docs are Review Comment: `source uniques` -> `any source vector whose doc(s) are all deleted is never copied into the merged pool`? ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import java.io.IOException; +import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsFormat; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene90.IndexedDISI; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.IndexOutput; + +/** + * Flat vector format that stores each unique vector exactly once per segment. Multiple documents + * (within the same field, and across fields with matching {@code (dimension, encoding)}) may share + * the same physical vector, dramatically reducing on-disk size when many docs/fields point at the + * same vector data. + * + * <p>The format groups fields into "pools" keyed by {@code (dimension, encoding)}. All fields in Review Comment: How about `organizes fields into groups with the same dimension and input type (byte/float)`? Edit: actually I think `pools` is OK here? It's used throughout ... and it grows on you (well, me at least) as I see it situ. It reminds me of what ZFS calls "pools" too. ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java: ########## @@ -0,0 +1,1024 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import static org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.KnnVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocsWithFieldSet; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.DirectWriter; + +/** + * Writes a {@link DedupFlatVectorsFormat} segment. + * + * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, with a per-pool dedup + * hash table ({@link FlushPool}). {@code flush()} writes contiguous unique-vector bytes per pool, + * then per-field {@code docOrd → vecOrd} maps. + * + * <p><b>Merge</b>: per-field calls iterate source segments via {@link DocIDMerger}. A per-pool + * {@link MergePool} hash table maps the 64-bit hash of each candidate vector to a {@code + * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be byte-verified by re-reading the + * source via mmap. No temp files are written; pool data is materialised into {@code .dvc} at {@link + * #finish()} time by re-reading each unique vector from its source segment (sources stay open until + * merge ends). + * + * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the merge takes a fast path + * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, sourceReader) pair and + * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc loop becomes two int + * reads with no hash + no byte-verify. Lazy materialisation ensures source uniques whose docs are + * all deleted are never copied into the merged pool. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsWriter extends FlatVectorsWriter { + + private static final long SHALLOW_RAM_BYTES_USED = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class); + + private final SegmentWriteState segmentWriteState; + private final IndexOutput meta; + private final IndexOutput vectorData; + + // Pools used during flush — keyed by (dim, encoding); insertion-order preserved as pool id. + private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>(); + private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>(); + + // Pools used during merge. + private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>(); Review Comment: Hmm we also allocate these in the flush case? They just remain empty? Maybe we need to / should factor out into a dedicated merging utility class? Not sure. ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java: ########## @@ -0,0 +1,1024 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import static org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.KnnVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocsWithFieldSet; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.DirectWriter; + +/** + * Writes a {@link DedupFlatVectorsFormat} segment. + * + * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, with a per-pool dedup + * hash table ({@link FlushPool}). {@code flush()} writes contiguous unique-vector bytes per pool, + * then per-field {@code docOrd → vecOrd} maps. + * + * <p><b>Merge</b>: per-field calls iterate source segments via {@link DocIDMerger}. A per-pool + * {@link MergePool} hash table maps the 64-bit hash of each candidate vector to a {@code + * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be byte-verified by re-reading the + * source via mmap. No temp files are written; pool data is materialised into {@code .dvc} at {@link + * #finish()} time by re-reading each unique vector from its source segment (sources stay open until + * merge ends). + * + * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the merge takes a fast path + * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, sourceReader) pair and + * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc loop becomes two int + * reads with no hash + no byte-verify. Lazy materialisation ensures source uniques whose docs are + * all deleted are never copied into the merged pool. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsWriter extends FlatVectorsWriter { + + private static final long SHALLOW_RAM_BYTES_USED = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class); + + private final SegmentWriteState segmentWriteState; + private final IndexOutput meta; + private final IndexOutput vectorData; + + // Pools used during flush — keyed by (dim, encoding); insertion-order preserved as pool id. + private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>(); + private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>(); + + // Pools used during merge. + private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>(); + private final List<MergeFieldState> mergeFields = new ArrayList<>(); + + private boolean finished; + private boolean usedForMerge; + + DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) throws IOException { + super(scorer); + this.segmentWriteState = state; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + String vectorDataFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION); + + boolean success = false; + IndexOutput m = null, v = null; + try { + m = state.directory.createOutput(metaFileName, state.context); + v = state.directory.createOutput(vectorDataFileName, state.context); + CodecUtil.writeIndexHeader( + m, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + CodecUtil.writeIndexHeader( + v, + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + this.meta = m; + this.vectorData = v; + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(m, v); + } + } + } + + // --------------------------------------------------------------------------- + // Indexing path (flush) + // --------------------------------------------------------------------------- + + @Override + public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws IOException { + PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), fieldInfo.getVectorEncoding()); + FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new); + FlushFieldWriter<?> w = + switch (fieldInfo.getVectorEncoding()) { + case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool); + case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool); + }; + flushFields.add(w); + return w; + } + + @Override + public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException { + if (usedForMerge) { + throw new IllegalStateException( + "DedupFlatVectorsWriter cannot be used for both flush and merge in the same instance"); + } + // 1. Write each pool's contiguous unique-vector bytes to .dvc. + long[] poolOffsets = new long[flushPools.size()]; + long[] poolLengths = new long[flushPools.size()]; + assignPoolIds(flushPools.values()); + int p = 0; + for (FlushPool pool : flushPools.values()) { + long start = alignOutput(vectorData, pool.key.encoding); + pool.writeBytes(vectorData); + poolOffsets[p] = start; + poolLengths[p] = vectorData.getFilePointer() - start; + p++; + } + // 2. Pool metadata. + writePoolsMeta(flushPools.values(), poolOffsets, poolLengths); + // 3. Per-field DISI + map + meta record. + for (FlushFieldWriter<?> field : flushFields) { + writeFlushField(field, maxDoc, sortMap); + field.finish(); + } + } + + private void assignPoolIds(Iterable<? extends Pool> pools) { + int n = 0; + for (Pool pool : pools) { + pool.poolId = n++; + } + } + + private void writePoolsMeta( + Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] poolLengths) + throws IOException { + int numPools = poolOffsets.length; + meta.writeVInt(numPools); + int p = 0; + for (Pool pool : orderedPools) { + meta.writeVInt(pool.key.dim); + meta.writeByte((byte) pool.key.encoding.ordinal()); + meta.writeVLong(poolOffsets[p]); + meta.writeVLong(poolLengths[p]); + meta.writeVInt(pool.uniqueCount()); + p++; + } + } + + private void writeFlushField(FlushFieldWriter<?> field, int maxDoc, Sorter.DocMap sortMap) + throws IOException { + int cardinality = field.docsWithField.cardinality(); + DocsWithFieldSet docsToWrite = field.docsWithField; + int[] mapValues = new int[cardinality]; + if (sortMap == null || cardinality == 0) { + for (int i = 0; i < cardinality; i++) { + mapValues[i] = field.docOrdToVecOrd.get(i); + } + } else { + int[] old2New = new int[cardinality]; + int[] new2Old = new int[cardinality]; + DocsWithFieldSet sortedSet = new DocsWithFieldSet(); + KnnVectorsWriter.mapOldOrdToNewOrd(field.docsWithField, sortMap, old2New, new2Old, sortedSet); + for (int newOrd = 0; newOrd < cardinality; newOrd++) { + mapValues[newOrd] = field.docOrdToVecOrd.get(new2Old[newOrd]); + } + docsToWrite = sortedSet; + } + writeFieldRecord(field.fieldInfo, field.pool, maxDoc, docsToWrite, mapValues); + } + + /** + * Emit one field's DISI + packed map to {@code .dvc} and one field meta record to {@code .dvm}. + * Meta-record format (matching read order in {@link DedupFlatVectorsReader.FieldEntry}): + * + * <pre> + * [int] fieldNumber + * [byte] similarityOrdinal + * [vint] poolId + * [int] cardinality + * ... OrdToDocDISIReaderConfiguration meta (writes inline) ... + * [vlong] mapOffset + * [vlong] mapLength + * [byte] bitsPerVecOrd + * </pre> + */ + private void writeFieldRecord( + FieldInfo fieldInfo, + Pool pool, + int maxDoc, + DocsWithFieldSet docsWithField, + int[] docOrdToVecOrd) + throws IOException { + int cardinality = docsWithField.cardinality(); + meta.writeInt(fieldInfo.number); + meta.writeByte((byte) fieldInfo.getVectorSimilarityFunction().ordinal()); + meta.writeVInt(pool.poolId); + meta.writeInt(cardinality); + OrdToDocDISIReaderConfiguration.writeStoredMeta( + DIRECT_MONOTONIC_BLOCK_SHIFT, meta, vectorData, cardinality, maxDoc, docsWithField); + long mapOffset = vectorData.getFilePointer(); + int bitsPerVecOrd; + if (cardinality == 0) { + bitsPerVecOrd = 1; + } else { + int maxVecOrd = Math.max(0, pool.uniqueCount() - 1); Review Comment: `pool.vectorCount()`? ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java: ########## @@ -0,0 +1,1024 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import static org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.KnnVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocsWithFieldSet; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.DirectWriter; + +/** + * Writes a {@link DedupFlatVectorsFormat} segment. + * + * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, with a per-pool dedup + * hash table ({@link FlushPool}). {@code flush()} writes contiguous unique-vector bytes per pool, + * then per-field {@code docOrd → vecOrd} maps. + * + * <p><b>Merge</b>: per-field calls iterate source segments via {@link DocIDMerger}. A per-pool + * {@link MergePool} hash table maps the 64-bit hash of each candidate vector to a {@code + * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be byte-verified by re-reading the + * source via mmap. No temp files are written; pool data is materialised into {@code .dvc} at {@link + * #finish()} time by re-reading each unique vector from its source segment (sources stay open until + * merge ends). + * + * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the merge takes a fast path + * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, sourceReader) pair and Review Comment: I think its optimizing the common case (merging N segments, all of which use this same format)? In which case I would call it "Step 1" instead of "Level A", is to merge the pools, i.e. `sourceVecOrd -> mergedVecOrd`. Then "Step 2" is nicely just looking up the `int -> int` remap for the vector ordinals for each doc. Similar to how docid is rewritten during postings merge to map around deletions. Very clean/fast! And the laziness is clever to: we don't copy over a vector from the pool if it doesn't appear in the merged segment (none of the remaining live docs have it). ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java: ########## @@ -0,0 +1,1024 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import static org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.KnnVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocsWithFieldSet; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.DirectWriter; + +/** + * Writes a {@link DedupFlatVectorsFormat} segment. + * + * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, with a per-pool dedup + * hash table ({@link FlushPool}). {@code flush()} writes contiguous unique-vector bytes per pool, + * then per-field {@code docOrd → vecOrd} maps. + * + * <p><b>Merge</b>: per-field calls iterate source segments via {@link DocIDMerger}. A per-pool + * {@link MergePool} hash table maps the 64-bit hash of each candidate vector to a {@code + * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be byte-verified by re-reading the + * source via mmap. No temp files are written; pool data is materialised into {@code .dvc} at {@link + * #finish()} time by re-reading each unique vector from its source segment (sources stay open until + * merge ends). + * + * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the merge takes a fast path + * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, sourceReader) pair and + * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc loop becomes two int + * reads with no hash + no byte-verify. Lazy materialisation ensures source uniques whose docs are + * all deleted are never copied into the merged pool. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsWriter extends FlatVectorsWriter { + + private static final long SHALLOW_RAM_BYTES_USED = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class); + + private final SegmentWriteState segmentWriteState; + private final IndexOutput meta; + private final IndexOutput vectorData; + + // Pools used during flush — keyed by (dim, encoding); insertion-order preserved as pool id. + private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>(); + private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>(); + + // Pools used during merge. + private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>(); + private final List<MergeFieldState> mergeFields = new ArrayList<>(); + + private boolean finished; + private boolean usedForMerge; + + DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) throws IOException { + super(scorer); + this.segmentWriteState = state; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + String vectorDataFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION); + + boolean success = false; + IndexOutput m = null, v = null; + try { + m = state.directory.createOutput(metaFileName, state.context); + v = state.directory.createOutput(vectorDataFileName, state.context); + CodecUtil.writeIndexHeader( + m, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + CodecUtil.writeIndexHeader( + v, + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + this.meta = m; + this.vectorData = v; + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(m, v); + } + } + } + + // --------------------------------------------------------------------------- + // Indexing path (flush) + // --------------------------------------------------------------------------- + + @Override + public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws IOException { + PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), fieldInfo.getVectorEncoding()); + FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new); + FlushFieldWriter<?> w = + switch (fieldInfo.getVectorEncoding()) { + case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool); + case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool); + }; + flushFields.add(w); + return w; + } + + @Override + public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException { + if (usedForMerge) { + throw new IllegalStateException( + "DedupFlatVectorsWriter cannot be used for both flush and merge in the same instance"); + } + // 1. Write each pool's contiguous unique-vector bytes to .dvc. + long[] poolOffsets = new long[flushPools.size()]; + long[] poolLengths = new long[flushPools.size()]; + assignPoolIds(flushPools.values()); + int p = 0; + for (FlushPool pool : flushPools.values()) { + long start = alignOutput(vectorData, pool.key.encoding); + pool.writeBytes(vectorData); + poolOffsets[p] = start; + poolLengths[p] = vectorData.getFilePointer() - start; + p++; + } + // 2. Pool metadata. + writePoolsMeta(flushPools.values(), poolOffsets, poolLengths); + // 3. Per-field DISI + map + meta record. + for (FlushFieldWriter<?> field : flushFields) { + writeFlushField(field, maxDoc, sortMap); + field.finish(); + } + } + + private void assignPoolIds(Iterable<? extends Pool> pools) { + int n = 0; + for (Pool pool : pools) { + pool.poolId = n++; + } + } + + private void writePoolsMeta( + Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] poolLengths) + throws IOException { + int numPools = poolOffsets.length; + meta.writeVInt(numPools); + int p = 0; + for (Pool pool : orderedPools) { + meta.writeVInt(pool.key.dim); + meta.writeByte((byte) pool.key.encoding.ordinal()); + meta.writeVLong(poolOffsets[p]); + meta.writeVLong(poolLengths[p]); + meta.writeVInt(pool.uniqueCount()); + p++; + } + } + + private void writeFlushField(FlushFieldWriter<?> field, int maxDoc, Sorter.DocMap sortMap) + throws IOException { + int cardinality = field.docsWithField.cardinality(); + DocsWithFieldSet docsToWrite = field.docsWithField; + int[] mapValues = new int[cardinality]; + if (sortMap == null || cardinality == 0) { + for (int i = 0; i < cardinality; i++) { + mapValues[i] = field.docOrdToVecOrd.get(i); + } + } else { + int[] old2New = new int[cardinality]; + int[] new2Old = new int[cardinality]; + DocsWithFieldSet sortedSet = new DocsWithFieldSet(); + KnnVectorsWriter.mapOldOrdToNewOrd(field.docsWithField, sortMap, old2New, new2Old, sortedSet); + for (int newOrd = 0; newOrd < cardinality; newOrd++) { + mapValues[newOrd] = field.docOrdToVecOrd.get(new2Old[newOrd]); + } + docsToWrite = sortedSet; + } + writeFieldRecord(field.fieldInfo, field.pool, maxDoc, docsToWrite, mapValues); + } + + /** + * Emit one field's DISI + packed map to {@code .dvc} and one field meta record to {@code .dvm}. + * Meta-record format (matching read order in {@link DedupFlatVectorsReader.FieldEntry}): + * + * <pre> + * [int] fieldNumber + * [byte] similarityOrdinal + * [vint] poolId + * [int] cardinality + * ... OrdToDocDISIReaderConfiguration meta (writes inline) ... + * [vlong] mapOffset + * [vlong] mapLength + * [byte] bitsPerVecOrd + * </pre> + */ + private void writeFieldRecord( + FieldInfo fieldInfo, + Pool pool, + int maxDoc, + DocsWithFieldSet docsWithField, + int[] docOrdToVecOrd) + throws IOException { + int cardinality = docsWithField.cardinality(); + meta.writeInt(fieldInfo.number); + meta.writeByte((byte) fieldInfo.getVectorSimilarityFunction().ordinal()); + meta.writeVInt(pool.poolId); + meta.writeInt(cardinality); + OrdToDocDISIReaderConfiguration.writeStoredMeta( + DIRECT_MONOTONIC_BLOCK_SHIFT, meta, vectorData, cardinality, maxDoc, docsWithField); + long mapOffset = vectorData.getFilePointer(); + int bitsPerVecOrd; + if (cardinality == 0) { + bitsPerVecOrd = 1; + } else { + int maxVecOrd = Math.max(0, pool.uniqueCount() - 1); + bitsPerVecOrd = DirectWriter.bitsRequired(Math.max(1, maxVecOrd)); + DirectWriter writer = DirectWriter.getInstance(vectorData, cardinality, bitsPerVecOrd); + for (int v : docOrdToVecOrd) { Review Comment: I'm confused why we need two layers here (`docOrd`)? Some docs might not have a vector, so first layer (today, before this change) is `docid` -> `vectorOrd`, right? But with this change, it seems like it should still be that? Or is `docOrd` maybe a synonym for `docid`? I'm confused :) ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java: ########## @@ -0,0 +1,1024 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import static org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.KnnVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocsWithFieldSet; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.DirectWriter; + +/** + * Writes a {@link DedupFlatVectorsFormat} segment. + * + * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, with a per-pool dedup + * hash table ({@link FlushPool}). {@code flush()} writes contiguous unique-vector bytes per pool, + * then per-field {@code docOrd → vecOrd} maps. + * + * <p><b>Merge</b>: per-field calls iterate source segments via {@link DocIDMerger}. A per-pool + * {@link MergePool} hash table maps the 64-bit hash of each candidate vector to a {@code + * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be byte-verified by re-reading the + * source via mmap. No temp files are written; pool data is materialised into {@code .dvc} at {@link + * #finish()} time by re-reading each unique vector from its source segment (sources stay open until + * merge ends). + * + * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the merge takes a fast path + * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, sourceReader) pair and + * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc loop becomes two int + * reads with no hash + no byte-verify. Lazy materialisation ensures source uniques whose docs are + * all deleted are never copied into the merged pool. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsWriter extends FlatVectorsWriter { + + private static final long SHALLOW_RAM_BYTES_USED = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class); + + private final SegmentWriteState segmentWriteState; + private final IndexOutput meta; + private final IndexOutput vectorData; + + // Pools used during flush — keyed by (dim, encoding); insertion-order preserved as pool id. + private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>(); + private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>(); + + // Pools used during merge. + private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>(); + private final List<MergeFieldState> mergeFields = new ArrayList<>(); + + private boolean finished; + private boolean usedForMerge; + + DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) throws IOException { + super(scorer); + this.segmentWriteState = state; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + String vectorDataFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION); + + boolean success = false; + IndexOutput m = null, v = null; + try { + m = state.directory.createOutput(metaFileName, state.context); + v = state.directory.createOutput(vectorDataFileName, state.context); + CodecUtil.writeIndexHeader( + m, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + CodecUtil.writeIndexHeader( + v, + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + this.meta = m; + this.vectorData = v; + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(m, v); + } + } + } + + // --------------------------------------------------------------------------- + // Indexing path (flush) + // --------------------------------------------------------------------------- + + @Override + public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws IOException { + PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), fieldInfo.getVectorEncoding()); + FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new); + FlushFieldWriter<?> w = + switch (fieldInfo.getVectorEncoding()) { + case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool); + case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool); + }; + flushFields.add(w); + return w; + } + + @Override + public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException { + if (usedForMerge) { + throw new IllegalStateException( + "DedupFlatVectorsWriter cannot be used for both flush and merge in the same instance"); + } + // 1. Write each pool's contiguous unique-vector bytes to .dvc. + long[] poolOffsets = new long[flushPools.size()]; + long[] poolLengths = new long[flushPools.size()]; + assignPoolIds(flushPools.values()); + int p = 0; + for (FlushPool pool : flushPools.values()) { + long start = alignOutput(vectorData, pool.key.encoding); + pool.writeBytes(vectorData); + poolOffsets[p] = start; + poolLengths[p] = vectorData.getFilePointer() - start; + p++; + } + // 2. Pool metadata. + writePoolsMeta(flushPools.values(), poolOffsets, poolLengths); + // 3. Per-field DISI + map + meta record. + for (FlushFieldWriter<?> field : flushFields) { + writeFlushField(field, maxDoc, sortMap); + field.finish(); + } + } + + private void assignPoolIds(Iterable<? extends Pool> pools) { + int n = 0; + for (Pool pool : pools) { + pool.poolId = n++; + } + } + + private void writePoolsMeta( + Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] poolLengths) + throws IOException { + int numPools = poolOffsets.length; + meta.writeVInt(numPools); + int p = 0; + for (Pool pool : orderedPools) { + meta.writeVInt(pool.key.dim); + meta.writeByte((byte) pool.key.encoding.ordinal()); + meta.writeVLong(poolOffsets[p]); + meta.writeVLong(poolLengths[p]); + meta.writeVInt(pool.uniqueCount()); + p++; + } + } + + private void writeFlushField(FlushFieldWriter<?> field, int maxDoc, Sorter.DocMap sortMap) + throws IOException { + int cardinality = field.docsWithField.cardinality(); + DocsWithFieldSet docsToWrite = field.docsWithField; + int[] mapValues = new int[cardinality]; + if (sortMap == null || cardinality == 0) { + for (int i = 0; i < cardinality; i++) { + mapValues[i] = field.docOrdToVecOrd.get(i); + } + } else { + int[] old2New = new int[cardinality]; + int[] new2Old = new int[cardinality]; + DocsWithFieldSet sortedSet = new DocsWithFieldSet(); + KnnVectorsWriter.mapOldOrdToNewOrd(field.docsWithField, sortMap, old2New, new2Old, sortedSet); + for (int newOrd = 0; newOrd < cardinality; newOrd++) { + mapValues[newOrd] = field.docOrdToVecOrd.get(new2Old[newOrd]); + } + docsToWrite = sortedSet; + } + writeFieldRecord(field.fieldInfo, field.pool, maxDoc, docsToWrite, mapValues); + } + + /** + * Emit one field's DISI + packed map to {@code .dvc} and one field meta record to {@code .dvm}. + * Meta-record format (matching read order in {@link DedupFlatVectorsReader.FieldEntry}): + * + * <pre> + * [int] fieldNumber + * [byte] similarityOrdinal + * [vint] poolId + * [int] cardinality + * ... OrdToDocDISIReaderConfiguration meta (writes inline) ... + * [vlong] mapOffset + * [vlong] mapLength + * [byte] bitsPerVecOrd + * </pre> + */ + private void writeFieldRecord( + FieldInfo fieldInfo, + Pool pool, + int maxDoc, + DocsWithFieldSet docsWithField, + int[] docOrdToVecOrd) + throws IOException { + int cardinality = docsWithField.cardinality(); + meta.writeInt(fieldInfo.number); + meta.writeByte((byte) fieldInfo.getVectorSimilarityFunction().ordinal()); + meta.writeVInt(pool.poolId); + meta.writeInt(cardinality); + OrdToDocDISIReaderConfiguration.writeStoredMeta( + DIRECT_MONOTONIC_BLOCK_SHIFT, meta, vectorData, cardinality, maxDoc, docsWithField); + long mapOffset = vectorData.getFilePointer(); + int bitsPerVecOrd; + if (cardinality == 0) { + bitsPerVecOrd = 1; + } else { + int maxVecOrd = Math.max(0, pool.uniqueCount() - 1); + bitsPerVecOrd = DirectWriter.bitsRequired(Math.max(1, maxVecOrd)); + DirectWriter writer = DirectWriter.getInstance(vectorData, cardinality, bitsPerVecOrd); + for (int v : docOrdToVecOrd) { + writer.add(v); + } + writer.finish(); + } + long mapLength = vectorData.getFilePointer() - mapOffset; + meta.writeVLong(mapOffset); + meta.writeVLong(mapLength); + meta.writeByte((byte) bitsPerVecOrd); + } + + // --------------------------------------------------------------------------- + // Merge path + // --------------------------------------------------------------------------- + + @Override + public void mergeOneFlatVectorField(FieldInfo fieldInfo, MergeState mergeState) + throws IOException { + if (!flushPools.isEmpty()) { + throw new IllegalStateException( + "DedupFlatVectorsWriter cannot be used for both flush and merge in the same instance"); + } + usedForMerge = true; + PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), fieldInfo.getVectorEncoding()); + MergePool pool = mergePools.computeIfAbsent(key, MergePool::new); + MergeFieldState fieldState = new MergeFieldState(fieldInfo, pool); + mergeFields.add(fieldState); + + switch (fieldInfo.getVectorEncoding()) { + case BYTE -> mergeByteField(fieldInfo, mergeState, fieldState); + case FLOAT32 -> mergeFloatField(fieldInfo, mergeState, fieldState); + } + } + + private void mergeFloatField( + FieldInfo fieldInfo, MergeState mergeState, MergeFieldState fieldState) throws IOException { + List<FloatSub> subs = new ArrayList<>(); + for (int i = 0; i < mergeState.knnVectorsReaders.length; i++) { + if (mergeState.knnVectorsReaders[i] == null + || mergeState.fieldInfos[i].fieldInfo(fieldInfo.name) == null + || mergeState.fieldInfos[i].fieldInfo(fieldInfo.name).hasVectorValues() == false) { + continue; + } + FloatVectorValues vals = mergeState.knnVectorsReaders[i].getFloatVectorValues(fieldInfo.name); + if (vals == null) continue; + subs.add(new FloatSub(mergeState.docMaps[i], vals, i)); + } + DocIDMerger<FloatSub> merger = DocIDMerger.of(subs, mergeState.needsIndexSort); + int dim = fieldInfo.getVectorDimension(); + byte[] candidateBytes = new byte[dim * Float.BYTES]; + ByteBuffer bb = ByteBuffer.wrap(candidateBytes).order(ByteOrder.LITTLE_ENDIAN); + // Level-A fast path setup: when a source reader is itself a dedup format reader, attach the + // source's docOrd→vecOrd table and a lazily-filled srcVecOrd→mergedVecOrd map (shared per + // (mergedPool, sourceReader)). The doc loop below interns each source vec-ord on first + // encounter only — uniques whose docs are all deleted are never interned (correctness), + // and uniques shared by many surviving docs are interned exactly once (perf win). Review Comment: Actually I think this should be considered a correctness issue too, i.e., all dups should be detected and shared? We should never see two different `vecOrd` pointing to what are actually identical vectors (false negative)? Hopefully we can `assert` to that effect, and let's e.g. fix tests to exercise this: intentionally create many dup vectors across segments, and then confirm on force-merge we have exactly the correct number. ########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java: ########## @@ -0,0 +1,1024 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import static org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.KnnVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocsWithFieldSet; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.DirectWriter; + +/** + * Writes a {@link DedupFlatVectorsFormat} segment. + * + * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, with a per-pool dedup + * hash table ({@link FlushPool}). {@code flush()} writes contiguous unique-vector bytes per pool, + * then per-field {@code docOrd → vecOrd} maps. + * + * <p><b>Merge</b>: per-field calls iterate source segments via {@link DocIDMerger}. A per-pool + * {@link MergePool} hash table maps the 64-bit hash of each candidate vector to a {@code + * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be byte-verified by re-reading the + * source via mmap. No temp files are written; pool data is materialised into {@code .dvc} at {@link + * #finish()} time by re-reading each unique vector from its source segment (sources stay open until + * merge ends). + * + * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the merge takes a fast path + * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, sourceReader) pair and + * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc loop becomes two int + * reads with no hash + no byte-verify. Lazy materialisation ensures source uniques whose docs are + * all deleted are never copied into the merged pool. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsWriter extends FlatVectorsWriter { + + private static final long SHALLOW_RAM_BYTES_USED = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class); + + private final SegmentWriteState segmentWriteState; + private final IndexOutput meta; + private final IndexOutput vectorData; + + // Pools used during flush — keyed by (dim, encoding); insertion-order preserved as pool id. + private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>(); + private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>(); + + // Pools used during merge. + private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>(); + private final List<MergeFieldState> mergeFields = new ArrayList<>(); + + private boolean finished; + private boolean usedForMerge; + + DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) throws IOException { + super(scorer); + this.segmentWriteState = state; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + String vectorDataFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION); + + boolean success = false; + IndexOutput m = null, v = null; + try { + m = state.directory.createOutput(metaFileName, state.context); + v = state.directory.createOutput(vectorDataFileName, state.context); + CodecUtil.writeIndexHeader( + m, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + CodecUtil.writeIndexHeader( + v, + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + this.meta = m; + this.vectorData = v; + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(m, v); + } + } + } + + // --------------------------------------------------------------------------- + // Indexing path (flush) + // --------------------------------------------------------------------------- + + @Override + public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws IOException { + PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), fieldInfo.getVectorEncoding()); + FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new); + FlushFieldWriter<?> w = + switch (fieldInfo.getVectorEncoding()) { + case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool); + case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool); + }; + flushFields.add(w); + return w; + } + + @Override + public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException { + if (usedForMerge) { + throw new IllegalStateException( + "DedupFlatVectorsWriter cannot be used for both flush and merge in the same instance"); + } + // 1. Write each pool's contiguous unique-vector bytes to .dvc. + long[] poolOffsets = new long[flushPools.size()]; + long[] poolLengths = new long[flushPools.size()]; + assignPoolIds(flushPools.values()); + int p = 0; + for (FlushPool pool : flushPools.values()) { + long start = alignOutput(vectorData, pool.key.encoding); + pool.writeBytes(vectorData); + poolOffsets[p] = start; + poolLengths[p] = vectorData.getFilePointer() - start; + p++; + } + // 2. Pool metadata. + writePoolsMeta(flushPools.values(), poolOffsets, poolLengths); + // 3. Per-field DISI + map + meta record. + for (FlushFieldWriter<?> field : flushFields) { + writeFlushField(field, maxDoc, sortMap); + field.finish(); + } + } + + private void assignPoolIds(Iterable<? extends Pool> pools) { + int n = 0; + for (Pool pool : pools) { + pool.poolId = n++; + } + } + + private void writePoolsMeta( + Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] poolLengths) + throws IOException { + int numPools = poolOffsets.length; + meta.writeVInt(numPools); + int p = 0; + for (Pool pool : orderedPools) { + meta.writeVInt(pool.key.dim); + meta.writeByte((byte) pool.key.encoding.ordinal()); + meta.writeVLong(poolOffsets[p]); + meta.writeVLong(poolLengths[p]); + meta.writeVInt(pool.uniqueCount()); + p++; + } + } + + private void writeFlushField(FlushFieldWriter<?> field, int maxDoc, Sorter.DocMap sortMap) + throws IOException { + int cardinality = field.docsWithField.cardinality(); + DocsWithFieldSet docsToWrite = field.docsWithField; + int[] mapValues = new int[cardinality]; + if (sortMap == null || cardinality == 0) { + for (int i = 0; i < cardinality; i++) { + mapValues[i] = field.docOrdToVecOrd.get(i); + } + } else { + int[] old2New = new int[cardinality]; + int[] new2Old = new int[cardinality]; + DocsWithFieldSet sortedSet = new DocsWithFieldSet(); + KnnVectorsWriter.mapOldOrdToNewOrd(field.docsWithField, sortMap, old2New, new2Old, sortedSet); + for (int newOrd = 0; newOrd < cardinality; newOrd++) { + mapValues[newOrd] = field.docOrdToVecOrd.get(new2Old[newOrd]); + } + docsToWrite = sortedSet; + } + writeFieldRecord(field.fieldInfo, field.pool, maxDoc, docsToWrite, mapValues); + } + + /** + * Emit one field's DISI + packed map to {@code .dvc} and one field meta record to {@code .dvm}. + * Meta-record format (matching read order in {@link DedupFlatVectorsReader.FieldEntry}): + * + * <pre> + * [int] fieldNumber + * [byte] similarityOrdinal + * [vint] poolId + * [int] cardinality + * ... OrdToDocDISIReaderConfiguration meta (writes inline) ... + * [vlong] mapOffset + * [vlong] mapLength + * [byte] bitsPerVecOrd + * </pre> + */ + private void writeFieldRecord( + FieldInfo fieldInfo, + Pool pool, + int maxDoc, + DocsWithFieldSet docsWithField, + int[] docOrdToVecOrd) + throws IOException { + int cardinality = docsWithField.cardinality(); + meta.writeInt(fieldInfo.number); + meta.writeByte((byte) fieldInfo.getVectorSimilarityFunction().ordinal()); + meta.writeVInt(pool.poolId); + meta.writeInt(cardinality); + OrdToDocDISIReaderConfiguration.writeStoredMeta( + DIRECT_MONOTONIC_BLOCK_SHIFT, meta, vectorData, cardinality, maxDoc, docsWithField); + long mapOffset = vectorData.getFilePointer(); + int bitsPerVecOrd; + if (cardinality == 0) { + bitsPerVecOrd = 1; + } else { + int maxVecOrd = Math.max(0, pool.uniqueCount() - 1); + bitsPerVecOrd = DirectWriter.bitsRequired(Math.max(1, maxVecOrd)); + DirectWriter writer = DirectWriter.getInstance(vectorData, cardinality, bitsPerVecOrd); + for (int v : docOrdToVecOrd) { + writer.add(v); + } + writer.finish(); + } + long mapLength = vectorData.getFilePointer() - mapOffset; + meta.writeVLong(mapOffset); + meta.writeVLong(mapLength); + meta.writeByte((byte) bitsPerVecOrd); + } + + // --------------------------------------------------------------------------- + // Merge path + // --------------------------------------------------------------------------- + + @Override + public void mergeOneFlatVectorField(FieldInfo fieldInfo, MergeState mergeState) + throws IOException { + if (!flushPools.isEmpty()) { + throw new IllegalStateException( + "DedupFlatVectorsWriter cannot be used for both flush and merge in the same instance"); + } + usedForMerge = true; + PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), fieldInfo.getVectorEncoding()); + MergePool pool = mergePools.computeIfAbsent(key, MergePool::new); + MergeFieldState fieldState = new MergeFieldState(fieldInfo, pool); + mergeFields.add(fieldState); + + switch (fieldInfo.getVectorEncoding()) { + case BYTE -> mergeByteField(fieldInfo, mergeState, fieldState); + case FLOAT32 -> mergeFloatField(fieldInfo, mergeState, fieldState); + } + } + + private void mergeFloatField( + FieldInfo fieldInfo, MergeState mergeState, MergeFieldState fieldState) throws IOException { + List<FloatSub> subs = new ArrayList<>(); + for (int i = 0; i < mergeState.knnVectorsReaders.length; i++) { + if (mergeState.knnVectorsReaders[i] == null + || mergeState.fieldInfos[i].fieldInfo(fieldInfo.name) == null + || mergeState.fieldInfos[i].fieldInfo(fieldInfo.name).hasVectorValues() == false) { + continue; + } + FloatVectorValues vals = mergeState.knnVectorsReaders[i].getFloatVectorValues(fieldInfo.name); + if (vals == null) continue; + subs.add(new FloatSub(mergeState.docMaps[i], vals, i)); + } + DocIDMerger<FloatSub> merger = DocIDMerger.of(subs, mergeState.needsIndexSort); + int dim = fieldInfo.getVectorDimension(); + byte[] candidateBytes = new byte[dim * Float.BYTES]; + ByteBuffer bb = ByteBuffer.wrap(candidateBytes).order(ByteOrder.LITTLE_ENDIAN); + // Level-A fast path setup: when a source reader is itself a dedup format reader, attach the + // source's docOrd→vecOrd table and a lazily-filled srcVecOrd→mergedVecOrd map (shared per + // (mergedPool, sourceReader)). The doc loop below interns each source vec-ord on first + // encounter only — uniques whose docs are all deleted are never interned (correctness), + // and uniques shared by many surviving docs are interned exactly once (perf win). Review Comment: And, mix in some deletes too! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
