msokolov commented on code in PR #15979:
URL: https://github.com/apache/lucene/pull/15979#discussion_r3358407756


##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene90.IndexedDISI;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Flat vector format that stores each unique vector exactly once per segment. 
Multiple documents
+ * (within the same field, and across fields with matching {@code (dimension, 
encoding)}) may share

Review Comment:
   what if in future we add other distinguishing factors? EG centered and 
rotated vs not centered and rotated?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene90.IndexedDISI;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Flat vector format that stores each unique vector exactly once per segment. 
Multiple documents
+ * (within the same field, and across fields with matching {@code (dimension, 
encoding)}) may share
+ * the same physical vector, dramatically reducing on-disk size when many 
docs/fields point at the
+ * same vector data.
+ *
+ * <p>The format groups fields into "pools" keyed by {@code (dimension, 
encoding)}. All fields in

Review Comment:
   For me, pool has the connotation of a limited resource that can be reserved 
and returned. What do you think of "variety"? 



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene90.IndexedDISI;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Flat vector format that stores each unique vector exactly once per segment. 
Multiple documents
+ * (within the same field, and across fields with matching {@code (dimension, 
encoding)}) may share
+ * the same physical vector, dramatically reducing on-disk size when many 
docs/fields point at the
+ * same vector data.
+ *
+ * <p>The format groups fields into "pools" keyed by {@code (dimension, 
encoding)}. All fields in
+ * the same pool share unique-vector storage; per-field metadata records how 
each doc-ord maps to a
+ * vector-ord within the pool.
+ *
+ * <h2>.dvc (deduped vector data) file</h2>

Review Comment:
   hmm - we already have `dvd` for doc-value-data, right?  Maybe `vpd` (vector 
pool data) or `vvd` (vector variety data)?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene90.IndexedDISI;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Flat vector format that stores each unique vector exactly once per segment. 
Multiple documents
+ * (within the same field, and across fields with matching {@code (dimension, 
encoding)}) may share
+ * the same physical vector, dramatically reducing on-disk size when many 
docs/fields point at the
+ * same vector data.
+ *
+ * <p>The format groups fields into "pools" keyed by {@code (dimension, 
encoding)}. All fields in
+ * the same pool share unique-vector storage; per-field metadata records how 
each doc-ord maps to a
+ * vector-ord within the pool.
+ *
+ * <h2>.dvc (deduped vector data) file</h2>
+ *
+ * <ul>
+ *   <li>For each pool, in pool-id order:
+ *       <ul>
+ *         <li>64-byte aligned for FLOAT32 (4-byte aligned for BYTE)
+ *         <li>{@code uniqueCount * dim * byteSize} raw vector bytes, 
addressed by vec-ord
+ *       </ul>
+ *   <li>For each field, in declaration order:
+ *       <ul>
+ *         <li>If sparse: DISI bitset for docsWithField, written by {@link
+ *             IndexedDISI#writeBitSet(DocIdSetIterator, IndexOutput, byte)}, 
plus a {@link
+ *             org.apache.lucene.util.packed.DirectMonotonicWriter} ord-to-doc 
table.
+ *         <li>Packed {@code docOrd -> vecOrd} map, written by {@link
+ *             org.apache.lucene.util.packed.DirectWriter} with {@code 
ceil(log2(poolUniqueCount))}
+ *             bits per value.
+ *       </ul>
+ * </ul>
+ *
+ * <h2>.dvm (deduped vector metadata) file</h2>

Review Comment:
   maybe vpm or vvm? to distinguish better from docvalues



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.

Review Comment:
   say why we care about insertion-order



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and

Review Comment:
   what does "level A" mean?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();

Review Comment:
   `flushFieldWriters`?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());

Review Comment:
   maybe pass FieldInfo to PoolKey directly so it can pick and choose which 
parts of the FI are significant. That way if it changes the changes are 
localized



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new);
+    FlushFieldWriter<?> w =
+        switch (fieldInfo.getVectorEncoding()) {
+          case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool);
+          case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool);
+        };
+    flushFields.add(w);
+    return w;
+  }
+
+  @Override
+  public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
+    if (usedForMerge) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    // 1. Write each pool's contiguous unique-vector bytes to .dvc.
+    long[] poolOffsets = new long[flushPools.size()];
+    long[] poolLengths = new long[flushPools.size()];
+    assignPoolIds(flushPools.values());
+    int p = 0;
+    for (FlushPool pool : flushPools.values()) {
+      long start = alignOutput(vectorData, pool.key.encoding);
+      pool.writeBytes(vectorData);
+      poolOffsets[p] = start;
+      poolLengths[p] = vectorData.getFilePointer() - start;
+      p++;
+    }
+    // 2. Pool metadata.
+    writePoolsMeta(flushPools.values(), poolOffsets, poolLengths);
+    // 3. Per-field DISI + map + meta record.
+    for (FlushFieldWriter<?> field : flushFields) {
+      writeFlushField(field, maxDoc, sortMap);

Review Comment:
   on the face of it, I'd want this to be `flushFieldWriter.write(maxDoc, 
sortMap)` but maybe `writeFlushField` is using some state from this writer ...



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and

Review Comment:
   I think this explanation can be clearer. Are we saying we can avoid vector 
comparisons for multiple occurrences of the same vector within the same source 
segment, which we would otherwise (when the source segment is not deduped) have 
to perform, because we already did the deduping in the source segment, so with 
this opto, we only need to compare each unique vector with all the other 
vectors in *other* segments?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new);
+    FlushFieldWriter<?> w =
+        switch (fieldInfo.getVectorEncoding()) {
+          case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool);
+          case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool);
+        };
+    flushFields.add(w);
+    return w;
+  }
+
+  @Override
+  public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
+    if (usedForMerge) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    // 1. Write each pool's contiguous unique-vector bytes to .dvc.
+    long[] poolOffsets = new long[flushPools.size()];
+    long[] poolLengths = new long[flushPools.size()];
+    assignPoolIds(flushPools.values());
+    int p = 0;
+    for (FlushPool pool : flushPools.values()) {
+      long start = alignOutput(vectorData, pool.key.encoding);
+      pool.writeBytes(vectorData);
+      poolOffsets[p] = start;
+      poolLengths[p] = vectorData.getFilePointer() - start;
+      p++;
+    }
+    // 2. Pool metadata.
+    writePoolsMeta(flushPools.values(), poolOffsets, poolLengths);
+    // 3. Per-field DISI + map + meta record.
+    for (FlushFieldWriter<?> field : flushFields) {
+      writeFlushField(field, maxDoc, sortMap);
+      field.finish();
+    }
+  }
+
+  private void assignPoolIds(Iterable<? extends Pool> pools) {
+    int n = 0;
+    for (Pool pool : pools) {
+      pool.poolId = n++;
+    }
+  }
+
+  private void writePoolsMeta(
+      Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] 
poolLengths)
+      throws IOException {
+    int numPools = poolOffsets.length;
+    meta.writeVInt(numPools);
+    int p = 0;
+    for (Pool pool : orderedPools) {
+      meta.writeVInt(pool.key.dim);
+      meta.writeByte((byte) pool.key.encoding.ordinal());
+      meta.writeVLong(poolOffsets[p]);
+      meta.writeVLong(poolLengths[p]);
+      meta.writeVInt(pool.uniqueCount());
+      p++;
+    }
+  }
+
+  private void writeFlushField(FlushFieldWriter<?> field, int maxDoc, 
Sorter.DocMap sortMap)
+      throws IOException {
+    int cardinality = field.docsWithField.cardinality();
+    DocsWithFieldSet docsToWrite = field.docsWithField;
+    int[] mapValues = new int[cardinality];
+    if (sortMap == null || cardinality == 0) {
+      for (int i = 0; i < cardinality; i++) {
+        mapValues[i] = field.docOrdToVecOrd.get(i);
+      }
+    } else {
+      int[] old2New = new int[cardinality];
+      int[] new2Old = new int[cardinality];
+      DocsWithFieldSet sortedSet = new DocsWithFieldSet();
+      KnnVectorsWriter.mapOldOrdToNewOrd(field.docsWithField, sortMap, 
old2New, new2Old, sortedSet);
+      for (int newOrd = 0; newOrd < cardinality; newOrd++) {
+        mapValues[newOrd] = field.docOrdToVecOrd.get(new2Old[newOrd]);
+      }
+      docsToWrite = sortedSet;
+    }
+    writeFieldRecord(field.fieldInfo, field.pool, maxDoc, docsToWrite, 
mapValues);
+  }
+
+  /**
+   * Emit one field's DISI + packed map to {@code .dvc} and one field meta 
record to {@code .dvm}.
+   * Meta-record format (matching read order in {@link 
DedupFlatVectorsReader.FieldEntry}):
+   *
+   * <pre>
+   *   [int]   fieldNumber
+   *   [byte]  similarityOrdinal
+   *   [vint]  poolId
+   *   [int]   cardinality
+   *   ... OrdToDocDISIReaderConfiguration meta (writes inline) ...
+   *   [vlong] mapOffset
+   *   [vlong] mapLength
+   *   [byte]  bitsPerVecOrd
+   * </pre>
+   */
+  private void writeFieldRecord(
+      FieldInfo fieldInfo,
+      Pool pool,
+      int maxDoc,
+      DocsWithFieldSet docsWithField,
+      int[] docOrdToVecOrd)
+      throws IOException {
+    int cardinality = docsWithField.cardinality();
+    meta.writeInt(fieldInfo.number);
+    meta.writeByte((byte) fieldInfo.getVectorSimilarityFunction().ordinal());
+    meta.writeVInt(pool.poolId);
+    meta.writeInt(cardinality);
+    OrdToDocDISIReaderConfiguration.writeStoredMeta(
+        DIRECT_MONOTONIC_BLOCK_SHIFT, meta, vectorData, cardinality, maxDoc, 
docsWithField);
+    long mapOffset = vectorData.getFilePointer();
+    int bitsPerVecOrd;
+    if (cardinality == 0) {
+      bitsPerVecOrd = 1;
+    } else {
+      int maxVecOrd = Math.max(0, pool.uniqueCount() - 1);
+      bitsPerVecOrd = DirectWriter.bitsRequired(Math.max(1, maxVecOrd));
+      DirectWriter writer = DirectWriter.getInstance(vectorData, cardinality, 
bitsPerVecOrd);
+      for (int v : docOrdToVecOrd) {
+        writer.add(v);
+      }
+      writer.finish();
+    }
+    long mapLength = vectorData.getFilePointer() - mapOffset;
+    meta.writeVLong(mapOffset);
+    meta.writeVLong(mapLength);
+    meta.writeByte((byte) bitsPerVecOrd);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Merge path
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public void mergeOneFlatVectorField(FieldInfo fieldInfo, MergeState 
mergeState)
+      throws IOException {
+    if (!flushPools.isEmpty()) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    usedForMerge = true;
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    MergePool pool = mergePools.computeIfAbsent(key, MergePool::new);
+    MergeFieldState fieldState = new MergeFieldState(fieldInfo, pool);
+    mergeFields.add(fieldState);
+
+    switch (fieldInfo.getVectorEncoding()) {
+      case BYTE -> mergeByteField(fieldInfo, mergeState, fieldState);
+      case FLOAT32 -> mergeFloatField(fieldInfo, mergeState, fieldState);
+    }
+  }
+
+  private void mergeFloatField(
+      FieldInfo fieldInfo, MergeState mergeState, MergeFieldState fieldState) 
throws IOException {
+    List<FloatSub> subs = new ArrayList<>();
+    for (int i = 0; i < mergeState.knnVectorsReaders.length; i++) {
+      if (mergeState.knnVectorsReaders[i] == null
+          || mergeState.fieldInfos[i].fieldInfo(fieldInfo.name) == null
+          || 
mergeState.fieldInfos[i].fieldInfo(fieldInfo.name).hasVectorValues() == false) {
+        continue;
+      }
+      FloatVectorValues vals = 
mergeState.knnVectorsReaders[i].getFloatVectorValues(fieldInfo.name);
+      if (vals == null) continue;
+      subs.add(new FloatSub(mergeState.docMaps[i], vals, i));
+    }
+    DocIDMerger<FloatSub> merger = DocIDMerger.of(subs, 
mergeState.needsIndexSort);
+    int dim = fieldInfo.getVectorDimension();
+    byte[] candidateBytes = new byte[dim * Float.BYTES];
+    ByteBuffer bb = 
ByteBuffer.wrap(candidateBytes).order(ByteOrder.LITTLE_ENDIAN);
+    // Level-A fast path setup: when a source reader is itself a dedup format 
reader, attach the
+    // source's docOrd→vecOrd table and a lazily-filled srcVecOrd→mergedVecOrd 
map (shared per
+    // (mergedPool, sourceReader)). The doc loop below interns each source 
vec-ord on first
+    // encounter only — uniques whose docs are all deleted are never interned 
(correctness),

Review Comment:
   what is meant by "(correctness)" here?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new);

Review Comment:
   does this need to be a thread-safe map?  Hmm maybe not; this writer is used 
during flush or merge by a single thread?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new);
+    FlushFieldWriter<?> w =
+        switch (fieldInfo.getVectorEncoding()) {
+          case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool);
+          case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool);
+        };
+    flushFields.add(w);
+    return w;
+  }
+
+  @Override
+  public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
+    if (usedForMerge) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    // 1. Write each pool's contiguous unique-vector bytes to .dvc.
+    long[] poolOffsets = new long[flushPools.size()];
+    long[] poolLengths = new long[flushPools.size()];
+    assignPoolIds(flushPools.values());
+    int p = 0;
+    for (FlushPool pool : flushPools.values()) {
+      long start = alignOutput(vectorData, pool.key.encoding);
+      pool.writeBytes(vectorData);
+      poolOffsets[p] = start;
+      poolLengths[p] = vectorData.getFilePointer() - start;
+      p++;
+    }
+    // 2. Pool metadata.
+    writePoolsMeta(flushPools.values(), poolOffsets, poolLengths);
+    // 3. Per-field DISI + map + meta record.
+    for (FlushFieldWriter<?> field : flushFields) {
+      writeFlushField(field, maxDoc, sortMap);
+      field.finish();
+    }
+  }
+
+  private void assignPoolIds(Iterable<? extends Pool> pools) {
+    int n = 0;
+    for (Pool pool : pools) {
+      pool.poolId = n++;
+    }
+  }
+
+  private void writePoolsMeta(
+      Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] 
poolLengths)
+      throws IOException {
+    int numPools = poolOffsets.length;
+    meta.writeVInt(numPools);
+    int p = 0;
+    for (Pool pool : orderedPools) {
+      meta.writeVInt(pool.key.dim);
+      meta.writeByte((byte) pool.key.encoding.ordinal());

Review Comment:
   if the pools are ordered (and dense, right?) why do we need to write the 
ordinals?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new);
+    FlushFieldWriter<?> w =
+        switch (fieldInfo.getVectorEncoding()) {
+          case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool);
+          case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool);
+        };
+    flushFields.add(w);
+    return w;
+  }
+
+  @Override
+  public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
+    if (usedForMerge) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    // 1. Write each pool's contiguous unique-vector bytes to .dvc.
+    long[] poolOffsets = new long[flushPools.size()];
+    long[] poolLengths = new long[flushPools.size()];
+    assignPoolIds(flushPools.values());

Review Comment:
   maybe just put this in the loop below? `pool.poolId = p`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to