mikemccand commented on a change in pull request #633:
URL: https://github.com/apache/lucene/pull/633#discussion_r819703946



##########
File path: lucene/core/src/java/org/apache/lucene/index/FieldInfos.java
##########
@@ -352,6 +352,14 @@ public FieldDimensions(int dimensionCount, int 
indexDimensionCount, int dimensio
       this.softDeletesFieldName = softDeletesFieldName;
     }
 
+    public void verifyFieldInfo(FieldInfo fi) {

Review comment:
       Should this really be public?  Or is it only for `asserting`, in which 
case maybe make it package private and rename to `boolean assertFieldInfo` or 
so?

##########
File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
##########
@@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) {
    */
   public long addIndexes(CodecReader... readers) throws IOException {
     ensureOpen();
-
-    // long so we can detect int overflow:

Review comment:
       Hmm we lost this comment?  Was that intentional?

##########
File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
##########
@@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) {
    */
   public long addIndexes(CodecReader... readers) throws IOException {
     ensureOpen();
-
-    // long so we can detect int overflow:
-    long numDocs = 0;
     long seqNo;
-    try {
-      if (infoStream.isEnabled("IW")) {
-        infoStream.message("IW", "flush at addIndexes(CodecReader...)");
-      }
-      flush(false, true);
+    long numDocs = 0;
+    final int mergeTimeoutInSeconds = 600;
 
-      String mergedName = newSegmentName();
-      int numSoftDeleted = 0;
-      for (CodecReader leaf : readers) {
-        numDocs += leaf.numDocs();
+    try {
+      // Best effort up front validations
+      for (CodecReader leaf: readers) {
         validateMergeReader(leaf);
-        if (softDeletesEnabled) {
-          Bits liveDocs = leaf.getLiveDocs();
-          numSoftDeleted +=
-              PendingSoftDeletes.countSoftDeletes(
-                  DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(
-                      config.getSoftDeletesField(), leaf),
-                  liveDocs);
+        for (FieldInfo fi: leaf.getFieldInfos()) {
+          globalFieldNumberMap.verifyFieldInfo(fi);
         }
+        numDocs += leaf.numDocs();
       }
-
-      // Best-effort up front check:
       testReserveDocs(numDocs);
 
-      final IOContext context =
-          new IOContext(
-              new MergeInfo(Math.toIntExact(numDocs), -1, false, 
UNBOUNDED_MAX_MERGE_SEGMENTS));
+      synchronized (this) {
+        ensureOpen();
+        if (merges.areEnabled() == false) {
+          throw new UnsupportedOperationException("Merges are disabled on 
current writer. " +

Review comment:
       I think this only happens if IW is aborting/closing?  Maybe throw 
`AlreadyClosedException` instead?  This message is otherwise sort of confusing 
to the poor user who sees it.

##########
File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
##########
@@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) {
    */
   public long addIndexes(CodecReader... readers) throws IOException {
     ensureOpen();
-
-    // long so we can detect int overflow:
-    long numDocs = 0;
     long seqNo;
-    try {
-      if (infoStream.isEnabled("IW")) {
-        infoStream.message("IW", "flush at addIndexes(CodecReader...)");
-      }
-      flush(false, true);
+    long numDocs = 0;
+    final int mergeTimeoutInSeconds = 600;
 
-      String mergedName = newSegmentName();
-      int numSoftDeleted = 0;
-      for (CodecReader leaf : readers) {
-        numDocs += leaf.numDocs();
+    try {
+      // Best effort up front validations
+      for (CodecReader leaf: readers) {
         validateMergeReader(leaf);
-        if (softDeletesEnabled) {
-          Bits liveDocs = leaf.getLiveDocs();
-          numSoftDeleted +=
-              PendingSoftDeletes.countSoftDeletes(
-                  DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(
-                      config.getSoftDeletesField(), leaf),
-                  liveDocs);
+        for (FieldInfo fi: leaf.getFieldInfos()) {
+          globalFieldNumberMap.verifyFieldInfo(fi);
         }
+        numDocs += leaf.numDocs();
       }
-
-      // Best-effort up front check:
       testReserveDocs(numDocs);
 
-      final IOContext context =
-          new IOContext(
-              new MergeInfo(Math.toIntExact(numDocs), -1, false, 
UNBOUNDED_MAX_MERGE_SEGMENTS));
+      synchronized (this) {
+        ensureOpen();
+        if (merges.areEnabled() == false) {
+          throw new UnsupportedOperationException("Merges are disabled on 
current writer. " +
+            "Cannot execute addIndexes(CodecReaders...) API");
+        }
+      }
+
+      MergePolicy mergePolicy = config.getMergePolicy();
+      MergePolicy.MergeSpecification spec = 
mergePolicy.findMerges(Arrays.asList(readers));

Review comment:
       Normal merges have the opportunity to "cascade": `MergePolicy` picks a 
round of initial merges, and as they complete and get swapped in, it is 
consulted again later and might choose to merge some of those just-merged 
segments into even bigger ones.  But it looks like we don't allow that here?  
We ask once, do those merges, commit to IW's `SegmentInfos`?  Maybe that's 
fine, it's just different.

##########
File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
##########
@@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) {
    */
   public long addIndexes(CodecReader... readers) throws IOException {
     ensureOpen();
-
-    // long so we can detect int overflow:
-    long numDocs = 0;
     long seqNo;
-    try {
-      if (infoStream.isEnabled("IW")) {
-        infoStream.message("IW", "flush at addIndexes(CodecReader...)");
-      }
-      flush(false, true);
+    long numDocs = 0;
+    final int mergeTimeoutInSeconds = 600;
 
-      String mergedName = newSegmentName();
-      int numSoftDeleted = 0;
-      for (CodecReader leaf : readers) {
-        numDocs += leaf.numDocs();
+    try {
+      // Best effort up front validations
+      for (CodecReader leaf: readers) {
         validateMergeReader(leaf);
-        if (softDeletesEnabled) {
-          Bits liveDocs = leaf.getLiveDocs();
-          numSoftDeleted +=
-              PendingSoftDeletes.countSoftDeletes(
-                  DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(
-                      config.getSoftDeletesField(), leaf),
-                  liveDocs);
+        for (FieldInfo fi: leaf.getFieldInfos()) {
+          globalFieldNumberMap.verifyFieldInfo(fi);
         }
+        numDocs += leaf.numDocs();
       }
-
-      // Best-effort up front check:
       testReserveDocs(numDocs);
 
-      final IOContext context =
-          new IOContext(
-              new MergeInfo(Math.toIntExact(numDocs), -1, false, 
UNBOUNDED_MAX_MERGE_SEGMENTS));
+      synchronized (this) {
+        ensureOpen();
+        if (merges.areEnabled() == false) {
+          throw new UnsupportedOperationException("Merges are disabled on 
current writer. " +
+            "Cannot execute addIndexes(CodecReaders...) API");
+        }
+      }
+
+      MergePolicy mergePolicy = config.getMergePolicy();
+      MergePolicy.MergeSpecification spec = 
mergePolicy.findMerges(Arrays.asList(readers));
+      boolean mergesComplete = false;
+      if (spec != null && spec.merges.size() > 0) {
+        try {
+          spec.merges.forEach(addIndexesMergeSource::registerMerge);
+          mergeScheduler.merge(addIndexesMergeSource, 
MergeTrigger.ADD_INDEXES);
+          mergesComplete = spec.await();
+        } finally {
+          if (!mergesComplete) {
+            // nocommit -- ensure all intermediate files are deleted
+            for (MergePolicy.OneMerge merge: spec.merges) {
+              deleteNewFiles(merge.getMergeInfo().files());
+            }
+          }
+        }
+      }
 
-      // TODO: somehow we should fix this merge so it's
-      // abortable so that IW.close(false) is able to stop it
-      TrackingDirectoryWrapper trackingDir = new 
TrackingDirectoryWrapper(directory);
-      Codec codec = config.getCodec();
-      // We set the min version to null for now, it will be set later by 
SegmentMerger
-      SegmentInfo info =
-          new SegmentInfo(
-              directoryOrig,
-              Version.LATEST,
-              null,
-              mergedName,
-              -1,
-              false,
-              codec,
-              Collections.emptyMap(),
-              StringHelper.randomId(),
-              Collections.emptyMap(),
-              config.getIndexSort());
-
-      SegmentMerger merger =
-          new SegmentMerger(
-              Arrays.asList(readers), info, infoStream, trackingDir, 
globalFieldNumberMap, context);
+      if (mergesComplete) {
+        List<SegmentCommitInfo> infos = new ArrayList<>();
+        long totalMaxDoc = 0;
+        for (MergePolicy.OneMerge merge: spec.merges) {
+          totalMaxDoc += merge.totalMaxDoc;
+          if (merge.getMergeInfo() != null) {

Review comment:
       Would this only be `null` if all documents in the merged segment had 
been deleted?  Maybe add a comment?

##########
File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
##########
@@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) {
    */
   public long addIndexes(CodecReader... readers) throws IOException {
     ensureOpen();
-
-    // long so we can detect int overflow:
-    long numDocs = 0;
     long seqNo;
-    try {
-      if (infoStream.isEnabled("IW")) {
-        infoStream.message("IW", "flush at addIndexes(CodecReader...)");
-      }
-      flush(false, true);
+    long numDocs = 0;
+    final int mergeTimeoutInSeconds = 600;
 
-      String mergedName = newSegmentName();
-      int numSoftDeleted = 0;
-      for (CodecReader leaf : readers) {
-        numDocs += leaf.numDocs();
+    try {
+      // Best effort up front validations
+      for (CodecReader leaf: readers) {
         validateMergeReader(leaf);
-        if (softDeletesEnabled) {
-          Bits liveDocs = leaf.getLiveDocs();
-          numSoftDeleted +=
-              PendingSoftDeletes.countSoftDeletes(
-                  DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(
-                      config.getSoftDeletesField(), leaf),
-                  liveDocs);
+        for (FieldInfo fi: leaf.getFieldInfos()) {
+          globalFieldNumberMap.verifyFieldInfo(fi);
         }
+        numDocs += leaf.numDocs();
       }
-
-      // Best-effort up front check:
       testReserveDocs(numDocs);
 
-      final IOContext context =
-          new IOContext(
-              new MergeInfo(Math.toIntExact(numDocs), -1, false, 
UNBOUNDED_MAX_MERGE_SEGMENTS));
+      synchronized (this) {
+        ensureOpen();
+        if (merges.areEnabled() == false) {
+          throw new UnsupportedOperationException("Merges are disabled on 
current writer. " +
+            "Cannot execute addIndexes(CodecReaders...) API");
+        }
+      }
+
+      MergePolicy mergePolicy = config.getMergePolicy();
+      MergePolicy.MergeSpecification spec = 
mergePolicy.findMerges(Arrays.asList(readers));
+      boolean mergesComplete = false;
+      if (spec != null && spec.merges.size() > 0) {
+        try {
+          spec.merges.forEach(addIndexesMergeSource::registerMerge);
+          mergeScheduler.merge(addIndexesMergeSource, 
MergeTrigger.ADD_INDEXES);
+          mergesComplete = spec.await();
+        } finally {
+          if (!mergesComplete) {

Review comment:
       `== false` instead?

##########
File path: lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
##########
@@ -39,6 +40,11 @@ public MergeSpecification findMerges(
     return null;
   }
 
+  @Override
+  public MergeSpecification findMerges(List<CodecReader> readers) throws 
IOException {
+    return null;

Review comment:
       Hmm should `null` even be allowed by this new API?  Won't this throw 
`NPE` if someone tries to `addIndexes`?

##########
File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
##########
@@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) {
    */
   public long addIndexes(CodecReader... readers) throws IOException {
     ensureOpen();
-
-    // long so we can detect int overflow:
-    long numDocs = 0;
     long seqNo;
-    try {
-      if (infoStream.isEnabled("IW")) {
-        infoStream.message("IW", "flush at addIndexes(CodecReader...)");
-      }
-      flush(false, true);
+    long numDocs = 0;
+    final int mergeTimeoutInSeconds = 600;
 
-      String mergedName = newSegmentName();
-      int numSoftDeleted = 0;
-      for (CodecReader leaf : readers) {
-        numDocs += leaf.numDocs();
+    try {
+      // Best effort up front validations
+      for (CodecReader leaf: readers) {
         validateMergeReader(leaf);
-        if (softDeletesEnabled) {
-          Bits liveDocs = leaf.getLiveDocs();
-          numSoftDeleted +=
-              PendingSoftDeletes.countSoftDeletes(
-                  DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(
-                      config.getSoftDeletesField(), leaf),
-                  liveDocs);
+        for (FieldInfo fi: leaf.getFieldInfos()) {
+          globalFieldNumberMap.verifyFieldInfo(fi);
         }
+        numDocs += leaf.numDocs();
       }
-
-      // Best-effort up front check:
       testReserveDocs(numDocs);
 
-      final IOContext context =
-          new IOContext(
-              new MergeInfo(Math.toIntExact(numDocs), -1, false, 
UNBOUNDED_MAX_MERGE_SEGMENTS));
+      synchronized (this) {
+        ensureOpen();
+        if (merges.areEnabled() == false) {
+          throw new UnsupportedOperationException("Merges are disabled on 
current writer. " +
+            "Cannot execute addIndexes(CodecReaders...) API");
+        }
+      }
+
+      MergePolicy mergePolicy = config.getMergePolicy();
+      MergePolicy.MergeSpecification spec = 
mergePolicy.findMerges(Arrays.asList(readers));
+      boolean mergesComplete = false;
+      if (spec != null && spec.merges.size() > 0) {
+        try {
+          spec.merges.forEach(addIndexesMergeSource::registerMerge);
+          mergeScheduler.merge(addIndexesMergeSource, 
MergeTrigger.ADD_INDEXES);
+          mergesComplete = spec.await();
+        } finally {
+          if (!mergesComplete) {
+            // nocommit -- ensure all intermediate files are deleted
+            for (MergePolicy.OneMerge merge: spec.merges) {
+              deleteNewFiles(merge.getMergeInfo().files());
+            }
+          }
+        }
+      }
 
-      // TODO: somehow we should fix this merge so it's
-      // abortable so that IW.close(false) is able to stop it
-      TrackingDirectoryWrapper trackingDir = new 
TrackingDirectoryWrapper(directory);
-      Codec codec = config.getCodec();
-      // We set the min version to null for now, it will be set later by 
SegmentMerger
-      SegmentInfo info =
-          new SegmentInfo(
-              directoryOrig,
-              Version.LATEST,
-              null,
-              mergedName,
-              -1,
-              false,
-              codec,
-              Collections.emptyMap(),
-              StringHelper.randomId(),
-              Collections.emptyMap(),
-              config.getIndexSort());
-
-      SegmentMerger merger =
-          new SegmentMerger(
-              Arrays.asList(readers), info, infoStream, trackingDir, 
globalFieldNumberMap, context);
+      if (mergesComplete) {
+        List<SegmentCommitInfo> infos = new ArrayList<>();
+        long totalMaxDoc = 0;
+        for (MergePolicy.OneMerge merge: spec.merges) {
+          totalMaxDoc += merge.totalMaxDoc;
+          if (merge.getMergeInfo() != null) {
+            infos.add(merge.getMergeInfo());
+          }
+        }
 
-      if (!merger.shouldMerge()) {
-        return docWriter.getNextSequenceNumber();
+        // nocommit -- add tests for this transactional behavior
+        synchronized (this) {
+          if (infos.isEmpty() == false) {
+            boolean success = false;
+            try {
+              ensureOpen();
+              // Reserve the docs, just before we update SIS:
+              reserveDocs(totalMaxDoc);
+              success = true;
+            } finally {
+              if (!success) {

Review comment:
       `== false`?

##########
File path: lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
##########
@@ -567,6 +605,21 @@ public abstract MergeSpecification findMerges(
       MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext 
mergeContext)
       throws IOException;
 
+  /**
+   * Define {@link OneMerge} operations for a list of codec readers. This call 
is used to
+   * define merges for input readers in {@link 
IndexWriter#addIndexes(CodecReader...)}.
+   * Default implementation adds all readers to a single merge. This can be 
overridden in custom
+   * merge policies.
+   *
+   * @param readers set of readers to merge into the main index
+   */
+  public MergeSpecification findMerges(List<CodecReader> readers) throws 
IOException {
+    OneMerge merge = new OneMerge(readers, leaf -> new MergeReader(leaf, 
leaf.getLiveDocs()));

Review comment:
       So this means `MergePolicy` by default matches the behavior 
`IW.addIndexes(CodecReader[])` today -- doing a single large merge.  Good!

##########
File path: lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
##########
@@ -813,12 +866,24 @@ protected final boolean verbose(MergeContext 
mergeContext) {
   }
 
   static final class MergeReader {
+    final CodecReader codecReader;
     final SegmentReader reader;

Review comment:
       Hmm it's a little spooky/confusing/smelly having both of these -- does 
anything still require accessing via `SegmentReader`?  Can they switch to 
`CodecReader`?

##########
File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
##########
@@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) {
    */
   public long addIndexes(CodecReader... readers) throws IOException {
     ensureOpen();
-
-    // long so we can detect int overflow:
-    long numDocs = 0;
     long seqNo;
-    try {
-      if (infoStream.isEnabled("IW")) {
-        infoStream.message("IW", "flush at addIndexes(CodecReader...)");
-      }
-      flush(false, true);
+    long numDocs = 0;
+    final int mergeTimeoutInSeconds = 600;
 
-      String mergedName = newSegmentName();
-      int numSoftDeleted = 0;
-      for (CodecReader leaf : readers) {
-        numDocs += leaf.numDocs();
+    try {
+      // Best effort up front validations
+      for (CodecReader leaf: readers) {
         validateMergeReader(leaf);
-        if (softDeletesEnabled) {
-          Bits liveDocs = leaf.getLiveDocs();
-          numSoftDeleted +=
-              PendingSoftDeletes.countSoftDeletes(
-                  DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(
-                      config.getSoftDeletesField(), leaf),
-                  liveDocs);
+        for (FieldInfo fi: leaf.getFieldInfos()) {
+          globalFieldNumberMap.verifyFieldInfo(fi);
         }
+        numDocs += leaf.numDocs();
       }
-
-      // Best-effort up front check:
       testReserveDocs(numDocs);
 
-      final IOContext context =
-          new IOContext(
-              new MergeInfo(Math.toIntExact(numDocs), -1, false, 
UNBOUNDED_MAX_MERGE_SEGMENTS));
+      synchronized (this) {
+        ensureOpen();
+        if (merges.areEnabled() == false) {
+          throw new UnsupportedOperationException("Merges are disabled on 
current writer. " +
+            "Cannot execute addIndexes(CodecReaders...) API");
+        }
+      }
+
+      MergePolicy mergePolicy = config.getMergePolicy();
+      MergePolicy.MergeSpecification spec = 
mergePolicy.findMerges(Arrays.asList(readers));
+      boolean mergesComplete = false;
+      if (spec != null && spec.merges.size() > 0) {
+        try {
+          spec.merges.forEach(addIndexesMergeSource::registerMerge);
+          mergeScheduler.merge(addIndexesMergeSource, 
MergeTrigger.ADD_INDEXES);
+          mergesComplete = spec.await();
+        } finally {
+          if (!mergesComplete) {
+            // nocommit -- ensure all intermediate files are deleted
+            for (MergePolicy.OneMerge merge: spec.merges) {
+              deleteNewFiles(merge.getMergeInfo().files());
+            }
+          }
+        }
+      }
 
-      // TODO: somehow we should fix this merge so it's
-      // abortable so that IW.close(false) is able to stop it
-      TrackingDirectoryWrapper trackingDir = new 
TrackingDirectoryWrapper(directory);
-      Codec codec = config.getCodec();
-      // We set the min version to null for now, it will be set later by 
SegmentMerger
-      SegmentInfo info =
-          new SegmentInfo(
-              directoryOrig,
-              Version.LATEST,
-              null,
-              mergedName,
-              -1,
-              false,
-              codec,
-              Collections.emptyMap(),
-              StringHelper.randomId(),
-              Collections.emptyMap(),
-              config.getIndexSort());
-
-      SegmentMerger merger =
-          new SegmentMerger(
-              Arrays.asList(readers), info, infoStream, trackingDir, 
globalFieldNumberMap, context);
+      if (mergesComplete) {
+        List<SegmentCommitInfo> infos = new ArrayList<>();
+        long totalMaxDoc = 0;
+        for (MergePolicy.OneMerge merge: spec.merges) {
+          totalMaxDoc += merge.totalMaxDoc;
+          if (merge.getMergeInfo() != null) {
+            infos.add(merge.getMergeInfo());
+          }
+        }
 
-      if (!merger.shouldMerge()) {
-        return docWriter.getNextSequenceNumber();
+        // nocommit -- add tests for this transactional behavior
+        synchronized (this) {
+          if (infos.isEmpty() == false) {
+            boolean success = false;
+            try {
+              ensureOpen();
+              // Reserve the docs, just before we update SIS:
+              reserveDocs(totalMaxDoc);
+              success = true;
+            } finally {
+              if (!success) {
+                for (SegmentCommitInfo sipc : infos) {
+                  // Safe: these files must exist
+                  deleteNewFiles(sipc.files());
+                }
+              }
+            }
+            segmentInfos.addAll(infos);
+            checkpoint();
+          }
+          seqNo = docWriter.getNextSequenceNumber();
+        }
+      } else {
+        // We should normally not reach here, as an earlier call should throw 
an exception.
+        throw new MergePolicy.MergeException("Could not complete merges within 
configured timeout of [" + mergeTimeoutInSeconds + "] seconds");
       }
+    } catch (VirtualMachineError tragedy) {
+      tragicEvent(tragedy, "addIndexes(CodecReader...)");
+      throw tragedy;
+    }
 
-      synchronized (this) {
-        ensureOpen();
-        assert merges.areEnabled();
-        runningAddIndexesMerges.add(merger);
+    maybeMerge();
+    return seqNo;
+  }
+
+  private class AddIndexesMergeSource implements MergeScheduler.MergeSource {

Review comment:
       Very cool that we are able to use `MergeSource` for this!!  I think this 
was added to decouple IW and MS so MS could be better tested (by mocking 
`MergeSource`s).  Wonderful to see that this better decoupling now helps nice 
changes like this, too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to