mikemccand commented on a change in pull request #633: URL: https://github.com/apache/lucene/pull/633#discussion_r819703946
########## File path: lucene/core/src/java/org/apache/lucene/index/FieldInfos.java ########## @@ -352,6 +352,14 @@ public FieldDimensions(int dimensionCount, int indexDimensionCount, int dimensio this.softDeletesFieldName = softDeletesFieldName; } + public void verifyFieldInfo(FieldInfo fi) { Review comment: Should this really be public? Or is it only for `asserting`, in which case maybe make it package private and rename to `boolean assertFieldInfo` or so? ########## File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java ########## @@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) { */ public long addIndexes(CodecReader... readers) throws IOException { ensureOpen(); - - // long so we can detect int overflow: Review comment: Hmm we lost this comment? Was that intentional? ########## File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java ########## @@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) { */ public long addIndexes(CodecReader... readers) throws IOException { ensureOpen(); - - // long so we can detect int overflow: - long numDocs = 0; long seqNo; - try { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "flush at addIndexes(CodecReader...)"); - } - flush(false, true); + long numDocs = 0; + final int mergeTimeoutInSeconds = 600; - String mergedName = newSegmentName(); - int numSoftDeleted = 0; - for (CodecReader leaf : readers) { - numDocs += leaf.numDocs(); + try { + // Best effort up front validations + for (CodecReader leaf: readers) { validateMergeReader(leaf); - if (softDeletesEnabled) { - Bits liveDocs = leaf.getLiveDocs(); - numSoftDeleted += - PendingSoftDeletes.countSoftDeletes( - DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator( - config.getSoftDeletesField(), leaf), - liveDocs); + for (FieldInfo fi: leaf.getFieldInfos()) { + globalFieldNumberMap.verifyFieldInfo(fi); } + numDocs += leaf.numDocs(); } - - // Best-effort up front check: testReserveDocs(numDocs); - final IOContext context = - new IOContext( - new MergeInfo(Math.toIntExact(numDocs), -1, false, UNBOUNDED_MAX_MERGE_SEGMENTS)); + synchronized (this) { + ensureOpen(); + if (merges.areEnabled() == false) { + throw new UnsupportedOperationException("Merges are disabled on current writer. " + Review comment: I think this only happens if IW is aborting/closing? Maybe throw `AlreadyClosedException` instead? This message is otherwise sort of confusing to the poor user who sees it. ########## File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java ########## @@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) { */ public long addIndexes(CodecReader... readers) throws IOException { ensureOpen(); - - // long so we can detect int overflow: - long numDocs = 0; long seqNo; - try { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "flush at addIndexes(CodecReader...)"); - } - flush(false, true); + long numDocs = 0; + final int mergeTimeoutInSeconds = 600; - String mergedName = newSegmentName(); - int numSoftDeleted = 0; - for (CodecReader leaf : readers) { - numDocs += leaf.numDocs(); + try { + // Best effort up front validations + for (CodecReader leaf: readers) { validateMergeReader(leaf); - if (softDeletesEnabled) { - Bits liveDocs = leaf.getLiveDocs(); - numSoftDeleted += - PendingSoftDeletes.countSoftDeletes( - DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator( - config.getSoftDeletesField(), leaf), - liveDocs); + for (FieldInfo fi: leaf.getFieldInfos()) { + globalFieldNumberMap.verifyFieldInfo(fi); } + numDocs += leaf.numDocs(); } - - // Best-effort up front check: testReserveDocs(numDocs); - final IOContext context = - new IOContext( - new MergeInfo(Math.toIntExact(numDocs), -1, false, UNBOUNDED_MAX_MERGE_SEGMENTS)); + synchronized (this) { + ensureOpen(); + if (merges.areEnabled() == false) { + throw new UnsupportedOperationException("Merges are disabled on current writer. " + + "Cannot execute addIndexes(CodecReaders...) API"); + } + } + + MergePolicy mergePolicy = config.getMergePolicy(); + MergePolicy.MergeSpecification spec = mergePolicy.findMerges(Arrays.asList(readers)); Review comment: Normal merges have the opportunity to "cascade": `MergePolicy` picks a round of initial merges, and as they complete and get swapped in, it is consulted again later and might choose to merge some of those just-merged segments into even bigger ones. But it looks like we don't allow that here? We ask once, do those merges, commit to IW's `SegmentInfos`? Maybe that's fine, it's just different. ########## File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java ########## @@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) { */ public long addIndexes(CodecReader... readers) throws IOException { ensureOpen(); - - // long so we can detect int overflow: - long numDocs = 0; long seqNo; - try { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "flush at addIndexes(CodecReader...)"); - } - flush(false, true); + long numDocs = 0; + final int mergeTimeoutInSeconds = 600; - String mergedName = newSegmentName(); - int numSoftDeleted = 0; - for (CodecReader leaf : readers) { - numDocs += leaf.numDocs(); + try { + // Best effort up front validations + for (CodecReader leaf: readers) { validateMergeReader(leaf); - if (softDeletesEnabled) { - Bits liveDocs = leaf.getLiveDocs(); - numSoftDeleted += - PendingSoftDeletes.countSoftDeletes( - DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator( - config.getSoftDeletesField(), leaf), - liveDocs); + for (FieldInfo fi: leaf.getFieldInfos()) { + globalFieldNumberMap.verifyFieldInfo(fi); } + numDocs += leaf.numDocs(); } - - // Best-effort up front check: testReserveDocs(numDocs); - final IOContext context = - new IOContext( - new MergeInfo(Math.toIntExact(numDocs), -1, false, UNBOUNDED_MAX_MERGE_SEGMENTS)); + synchronized (this) { + ensureOpen(); + if (merges.areEnabled() == false) { + throw new UnsupportedOperationException("Merges are disabled on current writer. " + + "Cannot execute addIndexes(CodecReaders...) API"); + } + } + + MergePolicy mergePolicy = config.getMergePolicy(); + MergePolicy.MergeSpecification spec = mergePolicy.findMerges(Arrays.asList(readers)); + boolean mergesComplete = false; + if (spec != null && spec.merges.size() > 0) { + try { + spec.merges.forEach(addIndexesMergeSource::registerMerge); + mergeScheduler.merge(addIndexesMergeSource, MergeTrigger.ADD_INDEXES); + mergesComplete = spec.await(); + } finally { + if (!mergesComplete) { + // nocommit -- ensure all intermediate files are deleted + for (MergePolicy.OneMerge merge: spec.merges) { + deleteNewFiles(merge.getMergeInfo().files()); + } + } + } + } - // TODO: somehow we should fix this merge so it's - // abortable so that IW.close(false) is able to stop it - TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory); - Codec codec = config.getCodec(); - // We set the min version to null for now, it will be set later by SegmentMerger - SegmentInfo info = - new SegmentInfo( - directoryOrig, - Version.LATEST, - null, - mergedName, - -1, - false, - codec, - Collections.emptyMap(), - StringHelper.randomId(), - Collections.emptyMap(), - config.getIndexSort()); - - SegmentMerger merger = - new SegmentMerger( - Arrays.asList(readers), info, infoStream, trackingDir, globalFieldNumberMap, context); + if (mergesComplete) { + List<SegmentCommitInfo> infos = new ArrayList<>(); + long totalMaxDoc = 0; + for (MergePolicy.OneMerge merge: spec.merges) { + totalMaxDoc += merge.totalMaxDoc; + if (merge.getMergeInfo() != null) { Review comment: Would this only be `null` if all documents in the merged segment had been deleted? Maybe add a comment? ########## File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java ########## @@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) { */ public long addIndexes(CodecReader... readers) throws IOException { ensureOpen(); - - // long so we can detect int overflow: - long numDocs = 0; long seqNo; - try { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "flush at addIndexes(CodecReader...)"); - } - flush(false, true); + long numDocs = 0; + final int mergeTimeoutInSeconds = 600; - String mergedName = newSegmentName(); - int numSoftDeleted = 0; - for (CodecReader leaf : readers) { - numDocs += leaf.numDocs(); + try { + // Best effort up front validations + for (CodecReader leaf: readers) { validateMergeReader(leaf); - if (softDeletesEnabled) { - Bits liveDocs = leaf.getLiveDocs(); - numSoftDeleted += - PendingSoftDeletes.countSoftDeletes( - DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator( - config.getSoftDeletesField(), leaf), - liveDocs); + for (FieldInfo fi: leaf.getFieldInfos()) { + globalFieldNumberMap.verifyFieldInfo(fi); } + numDocs += leaf.numDocs(); } - - // Best-effort up front check: testReserveDocs(numDocs); - final IOContext context = - new IOContext( - new MergeInfo(Math.toIntExact(numDocs), -1, false, UNBOUNDED_MAX_MERGE_SEGMENTS)); + synchronized (this) { + ensureOpen(); + if (merges.areEnabled() == false) { + throw new UnsupportedOperationException("Merges are disabled on current writer. " + + "Cannot execute addIndexes(CodecReaders...) API"); + } + } + + MergePolicy mergePolicy = config.getMergePolicy(); + MergePolicy.MergeSpecification spec = mergePolicy.findMerges(Arrays.asList(readers)); + boolean mergesComplete = false; + if (spec != null && spec.merges.size() > 0) { + try { + spec.merges.forEach(addIndexesMergeSource::registerMerge); + mergeScheduler.merge(addIndexesMergeSource, MergeTrigger.ADD_INDEXES); + mergesComplete = spec.await(); + } finally { + if (!mergesComplete) { Review comment: `== false` instead? ########## File path: lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java ########## @@ -39,6 +40,11 @@ public MergeSpecification findMerges( return null; } + @Override + public MergeSpecification findMerges(List<CodecReader> readers) throws IOException { + return null; Review comment: Hmm should `null` even be allowed by this new API? Won't this throw `NPE` if someone tries to `addIndexes`? ########## File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java ########## @@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) { */ public long addIndexes(CodecReader... readers) throws IOException { ensureOpen(); - - // long so we can detect int overflow: - long numDocs = 0; long seqNo; - try { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "flush at addIndexes(CodecReader...)"); - } - flush(false, true); + long numDocs = 0; + final int mergeTimeoutInSeconds = 600; - String mergedName = newSegmentName(); - int numSoftDeleted = 0; - for (CodecReader leaf : readers) { - numDocs += leaf.numDocs(); + try { + // Best effort up front validations + for (CodecReader leaf: readers) { validateMergeReader(leaf); - if (softDeletesEnabled) { - Bits liveDocs = leaf.getLiveDocs(); - numSoftDeleted += - PendingSoftDeletes.countSoftDeletes( - DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator( - config.getSoftDeletesField(), leaf), - liveDocs); + for (FieldInfo fi: leaf.getFieldInfos()) { + globalFieldNumberMap.verifyFieldInfo(fi); } + numDocs += leaf.numDocs(); } - - // Best-effort up front check: testReserveDocs(numDocs); - final IOContext context = - new IOContext( - new MergeInfo(Math.toIntExact(numDocs), -1, false, UNBOUNDED_MAX_MERGE_SEGMENTS)); + synchronized (this) { + ensureOpen(); + if (merges.areEnabled() == false) { + throw new UnsupportedOperationException("Merges are disabled on current writer. " + + "Cannot execute addIndexes(CodecReaders...) API"); + } + } + + MergePolicy mergePolicy = config.getMergePolicy(); + MergePolicy.MergeSpecification spec = mergePolicy.findMerges(Arrays.asList(readers)); + boolean mergesComplete = false; + if (spec != null && spec.merges.size() > 0) { + try { + spec.merges.forEach(addIndexesMergeSource::registerMerge); + mergeScheduler.merge(addIndexesMergeSource, MergeTrigger.ADD_INDEXES); + mergesComplete = spec.await(); + } finally { + if (!mergesComplete) { + // nocommit -- ensure all intermediate files are deleted + for (MergePolicy.OneMerge merge: spec.merges) { + deleteNewFiles(merge.getMergeInfo().files()); + } + } + } + } - // TODO: somehow we should fix this merge so it's - // abortable so that IW.close(false) is able to stop it - TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory); - Codec codec = config.getCodec(); - // We set the min version to null for now, it will be set later by SegmentMerger - SegmentInfo info = - new SegmentInfo( - directoryOrig, - Version.LATEST, - null, - mergedName, - -1, - false, - codec, - Collections.emptyMap(), - StringHelper.randomId(), - Collections.emptyMap(), - config.getIndexSort()); - - SegmentMerger merger = - new SegmentMerger( - Arrays.asList(readers), info, infoStream, trackingDir, globalFieldNumberMap, context); + if (mergesComplete) { + List<SegmentCommitInfo> infos = new ArrayList<>(); + long totalMaxDoc = 0; + for (MergePolicy.OneMerge merge: spec.merges) { + totalMaxDoc += merge.totalMaxDoc; + if (merge.getMergeInfo() != null) { + infos.add(merge.getMergeInfo()); + } + } - if (!merger.shouldMerge()) { - return docWriter.getNextSequenceNumber(); + // nocommit -- add tests for this transactional behavior + synchronized (this) { + if (infos.isEmpty() == false) { + boolean success = false; + try { + ensureOpen(); + // Reserve the docs, just before we update SIS: + reserveDocs(totalMaxDoc); + success = true; + } finally { + if (!success) { Review comment: `== false`? ########## File path: lucene/core/src/java/org/apache/lucene/index/MergePolicy.java ########## @@ -567,6 +605,21 @@ public abstract MergeSpecification findMerges( MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException; + /** + * Define {@link OneMerge} operations for a list of codec readers. This call is used to + * define merges for input readers in {@link IndexWriter#addIndexes(CodecReader...)}. + * Default implementation adds all readers to a single merge. This can be overridden in custom + * merge policies. + * + * @param readers set of readers to merge into the main index + */ + public MergeSpecification findMerges(List<CodecReader> readers) throws IOException { + OneMerge merge = new OneMerge(readers, leaf -> new MergeReader(leaf, leaf.getLiveDocs())); Review comment: So this means `MergePolicy` by default matches the behavior `IW.addIndexes(CodecReader[])` today -- doing a single large merge. Good! ########## File path: lucene/core/src/java/org/apache/lucene/index/MergePolicy.java ########## @@ -813,12 +866,24 @@ protected final boolean verbose(MergeContext mergeContext) { } static final class MergeReader { + final CodecReader codecReader; final SegmentReader reader; Review comment: Hmm it's a little spooky/confusing/smelly having both of these -- does anything still require accessing via `SegmentReader`? Can they switch to `CodecReader`? ########## File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java ########## @@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) { */ public long addIndexes(CodecReader... readers) throws IOException { ensureOpen(); - - // long so we can detect int overflow: - long numDocs = 0; long seqNo; - try { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "flush at addIndexes(CodecReader...)"); - } - flush(false, true); + long numDocs = 0; + final int mergeTimeoutInSeconds = 600; - String mergedName = newSegmentName(); - int numSoftDeleted = 0; - for (CodecReader leaf : readers) { - numDocs += leaf.numDocs(); + try { + // Best effort up front validations + for (CodecReader leaf: readers) { validateMergeReader(leaf); - if (softDeletesEnabled) { - Bits liveDocs = leaf.getLiveDocs(); - numSoftDeleted += - PendingSoftDeletes.countSoftDeletes( - DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator( - config.getSoftDeletesField(), leaf), - liveDocs); + for (FieldInfo fi: leaf.getFieldInfos()) { + globalFieldNumberMap.verifyFieldInfo(fi); } + numDocs += leaf.numDocs(); } - - // Best-effort up front check: testReserveDocs(numDocs); - final IOContext context = - new IOContext( - new MergeInfo(Math.toIntExact(numDocs), -1, false, UNBOUNDED_MAX_MERGE_SEGMENTS)); + synchronized (this) { + ensureOpen(); + if (merges.areEnabled() == false) { + throw new UnsupportedOperationException("Merges are disabled on current writer. " + + "Cannot execute addIndexes(CodecReaders...) API"); + } + } + + MergePolicy mergePolicy = config.getMergePolicy(); + MergePolicy.MergeSpecification spec = mergePolicy.findMerges(Arrays.asList(readers)); + boolean mergesComplete = false; + if (spec != null && spec.merges.size() > 0) { + try { + spec.merges.forEach(addIndexesMergeSource::registerMerge); + mergeScheduler.merge(addIndexesMergeSource, MergeTrigger.ADD_INDEXES); + mergesComplete = spec.await(); + } finally { + if (!mergesComplete) { + // nocommit -- ensure all intermediate files are deleted + for (MergePolicy.OneMerge merge: spec.merges) { + deleteNewFiles(merge.getMergeInfo().files()); + } + } + } + } - // TODO: somehow we should fix this merge so it's - // abortable so that IW.close(false) is able to stop it - TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory); - Codec codec = config.getCodec(); - // We set the min version to null for now, it will be set later by SegmentMerger - SegmentInfo info = - new SegmentInfo( - directoryOrig, - Version.LATEST, - null, - mergedName, - -1, - false, - codec, - Collections.emptyMap(), - StringHelper.randomId(), - Collections.emptyMap(), - config.getIndexSort()); - - SegmentMerger merger = - new SegmentMerger( - Arrays.asList(readers), info, infoStream, trackingDir, globalFieldNumberMap, context); + if (mergesComplete) { + List<SegmentCommitInfo> infos = new ArrayList<>(); + long totalMaxDoc = 0; + for (MergePolicy.OneMerge merge: spec.merges) { + totalMaxDoc += merge.totalMaxDoc; + if (merge.getMergeInfo() != null) { + infos.add(merge.getMergeInfo()); + } + } - if (!merger.shouldMerge()) { - return docWriter.getNextSequenceNumber(); + // nocommit -- add tests for this transactional behavior + synchronized (this) { + if (infos.isEmpty() == false) { + boolean success = false; + try { + ensureOpen(); + // Reserve the docs, just before we update SIS: + reserveDocs(totalMaxDoc); + success = true; + } finally { + if (!success) { + for (SegmentCommitInfo sipc : infos) { + // Safe: these files must exist + deleteNewFiles(sipc.files()); + } + } + } + segmentInfos.addAll(infos); + checkpoint(); + } + seqNo = docWriter.getNextSequenceNumber(); + } + } else { + // We should normally not reach here, as an earlier call should throw an exception. + throw new MergePolicy.MergeException("Could not complete merges within configured timeout of [" + mergeTimeoutInSeconds + "] seconds"); } + } catch (VirtualMachineError tragedy) { + tragicEvent(tragedy, "addIndexes(CodecReader...)"); + throw tragedy; + } - synchronized (this) { - ensureOpen(); - assert merges.areEnabled(); - runningAddIndexesMerges.add(merger); + maybeMerge(); + return seqNo; + } + + private class AddIndexesMergeSource implements MergeScheduler.MergeSource { Review comment: Very cool that we are able to use `MergeSource` for this!! I think this was added to decouple IW and MS so MS could be better tested (by mocking `MergeSource`s). Wonderful to see that this better decoupling now helps nice changes like this, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org