kaivalnp commented on code in PR #15979: URL: https://github.com/apache/lucene/pull/15979#discussion_r3398778630
########## lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java: ########## @@ -0,0 +1,1024 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.codecs.dedup; + +import static org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.KnnVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocsWithFieldSet; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.KnnVectorValues; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.DirectWriter; + +/** + * Writes a {@link DedupFlatVectorsFormat} segment. + * + * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, with a per-pool dedup + * hash table ({@link FlushPool}). {@code flush()} writes contiguous unique-vector bytes per pool, + * then per-field {@code docOrd → vecOrd} maps. + * + * <p><b>Merge</b>: per-field calls iterate source segments via {@link DocIDMerger}. A per-pool + * {@link MergePool} hash table maps the 64-bit hash of each candidate vector to a {@code + * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be byte-verified by re-reading the + * source via mmap. No temp files are written; pool data is materialised into {@code .dvc} at {@link + * #finish()} time by re-reading each unique vector from its source segment (sources stay open until + * merge ends). + * + * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the merge takes a fast path + * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, sourceReader) pair and + * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc loop becomes two int + * reads with no hash + no byte-verify. Lazy materialisation ensures source uniques whose docs are + * all deleted are never copied into the merged pool. + * + * @lucene.experimental + */ +public final class DedupFlatVectorsWriter extends FlatVectorsWriter { + + private static final long SHALLOW_RAM_BYTES_USED = + RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class); + + private final SegmentWriteState segmentWriteState; + private final IndexOutput meta; + private final IndexOutput vectorData; + + // Pools used during flush — keyed by (dim, encoding); insertion-order preserved as pool id. + private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>(); + private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>(); + + // Pools used during merge. + private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>(); + private final List<MergeFieldState> mergeFields = new ArrayList<>(); + + private boolean finished; + private boolean usedForMerge; + + DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) throws IOException { + super(scorer); + this.segmentWriteState = state; + String metaFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, DedupFlatVectorsFormat.META_EXTENSION); + String vectorDataFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION); + + boolean success = false; + IndexOutput m = null, v = null; + try { + m = state.directory.createOutput(metaFileName, state.context); + v = state.directory.createOutput(vectorDataFileName, state.context); + CodecUtil.writeIndexHeader( + m, + DedupFlatVectorsFormat.META_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + CodecUtil.writeIndexHeader( + v, + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME, + DedupFlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + this.meta = m; + this.vectorData = v; + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(m, v); + } + } + } + + // --------------------------------------------------------------------------- + // Indexing path (flush) + // --------------------------------------------------------------------------- + + @Override + public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws IOException { + PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), fieldInfo.getVectorEncoding()); + FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new); + FlushFieldWriter<?> w = + switch (fieldInfo.getVectorEncoding()) { + case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool); + case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool); + }; + flushFields.add(w); + return w; + } + + @Override + public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException { + if (usedForMerge) { + throw new IllegalStateException( + "DedupFlatVectorsWriter cannot be used for both flush and merge in the same instance"); + } + // 1. Write each pool's contiguous unique-vector bytes to .dvc. + long[] poolOffsets = new long[flushPools.size()]; + long[] poolLengths = new long[flushPools.size()]; + assignPoolIds(flushPools.values()); Review Comment: Nice catch, will change! The AI started out with something and I asked it to refine -- looks like it missed this simplification -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
