mikemccand commented on a change in pull request #128:
URL: https://github.com/apache/lucene/pull/128#discussion_r642550067



##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -605,209 +680,103 @@ public Status checkIndex(List<String> onlySegments) 
throws IOException {
     result.newSegments.clear();
     result.maxSegmentName = -1;
 
-    for (int i = 0; i < numSegments; i++) {
-      final SegmentCommitInfo info = sis.info(i);
-      long segmentName = Long.parseLong(info.info.name.substring(1), 
Character.MAX_RADIX);
-      if (segmentName > result.maxSegmentName) {
-        result.maxSegmentName = segmentName;
-      }
-      if (onlySegments != null && !onlySegments.contains(info.info.name)) {
-        continue;
-      }
-      Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus();
-      result.segmentInfos.add(segInfoStat);
-      msg(
-          infoStream,
-          "  "
-              + (1 + i)
-              + " of "
-              + numSegments
-              + ": name="
-              + info.info.name
-              + " maxDoc="
-              + info.info.maxDoc());
-      segInfoStat.name = info.info.name;
-      segInfoStat.maxDoc = info.info.maxDoc();
-
-      final Version version = info.info.getVersion();
-      if (info.info.maxDoc() <= 0) {
-        throw new RuntimeException("illegal number of documents: maxDoc=" + 
info.info.maxDoc());
-      }
-
-      int toLoseDocCount = info.info.maxDoc();
-
-      SegmentReader reader = null;
-
-      try {
-        msg(infoStream, "    version=" + (version == null ? "3.0" : version));
-        msg(infoStream, "    id=" + 
StringHelper.idToString(info.info.getId()));
-        final Codec codec = info.info.getCodec();
-        msg(infoStream, "    codec=" + codec);
-        segInfoStat.codec = codec;
-        msg(infoStream, "    compound=" + info.info.getUseCompoundFile());
-        segInfoStat.compound = info.info.getUseCompoundFile();
-        msg(infoStream, "    numFiles=" + info.files().size());
-        Sort indexSort = info.info.getIndexSort();
-        if (indexSort != null) {
-          msg(infoStream, "    sort=" + indexSort);
-        }
-        segInfoStat.numFiles = info.files().size();
-        segInfoStat.sizeMB = info.sizeInBytes() / (1024. * 1024.);
-        msg(infoStream, "    size (MB)=" + nf.format(segInfoStat.sizeMB));
-        Map<String, String> diagnostics = info.info.getDiagnostics();
-        segInfoStat.diagnostics = diagnostics;
-        if (diagnostics.size() > 0) {
-          msg(infoStream, "    diagnostics = " + diagnostics);
+    // checks segments sequentially
+    if (executorService == null) {

Review comment:
       Maybe we should make a single threaded executor so we don't have to 
bifurcate the code?

##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -843,6 +812,258 @@ public Status checkIndex(List<String> onlySegments) 
throws IOException {
     return result;
   }
 
+  private void updateMaxSegmentName(Status result, SegmentCommitInfo info) {
+    long segmentName = Long.parseLong(info.info.name.substring(1), 
Character.MAX_RADIX);
+    if (segmentName > result.maxSegmentName) {
+      result.maxSegmentName = segmentName;
+    }
+  }
+
+  private void processSegmentInfoStatusResult(
+      Status result, SegmentCommitInfo info, Status.SegmentInfoStatus 
segmentInfoStatus) {
+    result.segmentInfos.add(segmentInfoStatus);
+    if (segmentInfoStatus.error != null) {
+      result.totLoseDocCount += segmentInfoStatus.toLoseDocCount;
+      result.numBadSegments++;
+    } else {
+      // Keeper
+      result.newSegments.add(info.clone());
+    }
+  }
+
+  private <R> CompletableFuture<R> runAsyncSegmentCheck(
+      Callable<R> asyncCallable, ExecutorService executorService) {
+    return CompletableFuture.supplyAsync(callableToSupplier(asyncCallable), 
executorService);
+  }
+
+  private <T> Supplier<T> callableToSupplier(Callable<T> callable) {
+    return () -> {
+      try {
+        return callable.call();
+      } catch (RuntimeException | Error e) {
+        throw e;
+      } catch (Throwable e) {
+        throw new CompletionException(e);
+      }
+    };
+  }
+
+  private Status.SegmentInfoStatus testSegment(
+      SegmentInfos sis, SegmentCommitInfo info, PrintStream infoStream) throws 
IOException {
+    Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus();
+    segInfoStat.name = info.info.name;
+    segInfoStat.maxDoc = info.info.maxDoc();
+
+    final Version version = info.info.getVersion();
+    if (info.info.maxDoc() <= 0) {
+      throw new CheckIndexException(" illegal number of documents: maxDoc=" + 
info.info.maxDoc());
+    }
+
+    int toLoseDocCount = info.info.maxDoc();

Review comment:
       Hmm, sometimes this seems to be `maxDoc` (here) and other times 
`numDocs` (accounting for deleted documents properly) -- let's try to be 
consistent with what it was before (I think `numDocs`)?

##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -450,6 +479,14 @@ public void setChecksumsOnly(boolean v) {
 
   private boolean checksumsOnly;
 
+  /** Set threadCount used for parallelizing index integrity checking. */
+  public void setThreadCount(int tc) {
+    threadCount = tc;
+  }
+
+  // capped threadCount at 4
+  private int threadCount = 
Math.min(Runtime.getRuntime().availableProcessors(), 4);

Review comment:
       Whoa, why `4` :)  Could we maybe use java's 
`Runtime.getRuntime().availableProcessorts()`?

##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -843,6 +812,258 @@ public Status checkIndex(List<String> onlySegments) 
throws IOException {
     return result;
   }
 
+  private void updateMaxSegmentName(Status result, SegmentCommitInfo info) {
+    long segmentName = Long.parseLong(info.info.name.substring(1), 
Character.MAX_RADIX);
+    if (segmentName > result.maxSegmentName) {
+      result.maxSegmentName = segmentName;
+    }
+  }
+
+  private void processSegmentInfoStatusResult(
+      Status result, SegmentCommitInfo info, Status.SegmentInfoStatus 
segmentInfoStatus) {
+    result.segmentInfos.add(segmentInfoStatus);
+    if (segmentInfoStatus.error != null) {
+      result.totLoseDocCount += segmentInfoStatus.toLoseDocCount;
+      result.numBadSegments++;
+    } else {
+      // Keeper
+      result.newSegments.add(info.clone());
+    }
+  }
+
+  private <R> CompletableFuture<R> runAsyncSegmentCheck(
+      Callable<R> asyncCallable, ExecutorService executorService) {
+    return CompletableFuture.supplyAsync(callableToSupplier(asyncCallable), 
executorService);
+  }
+
+  private <T> Supplier<T> callableToSupplier(Callable<T> callable) {
+    return () -> {
+      try {
+        return callable.call();
+      } catch (RuntimeException | Error e) {
+        throw e;
+      } catch (Throwable e) {
+        throw new CompletionException(e);
+      }
+    };
+  }
+
+  private Status.SegmentInfoStatus testSegment(
+      SegmentInfos sis, SegmentCommitInfo info, PrintStream infoStream) throws 
IOException {
+    Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus();
+    segInfoStat.name = info.info.name;
+    segInfoStat.maxDoc = info.info.maxDoc();
+
+    final Version version = info.info.getVersion();
+    if (info.info.maxDoc() <= 0) {
+      throw new CheckIndexException(" illegal number of documents: maxDoc=" + 
info.info.maxDoc());
+    }
+
+    int toLoseDocCount = info.info.maxDoc();
+
+    SegmentReader reader = null;
+
+    try {
+      msg(infoStream, "    version=" + (version == null ? "3.0" : version));
+      msg(infoStream, "    id=" + StringHelper.idToString(info.info.getId()));
+      final Codec codec = info.info.getCodec();
+      msg(infoStream, "    codec=" + codec);
+      segInfoStat.codec = codec;
+      msg(infoStream, "    compound=" + info.info.getUseCompoundFile());
+      segInfoStat.compound = info.info.getUseCompoundFile();
+      msg(infoStream, "    numFiles=" + info.files().size());
+      Sort indexSort = info.info.getIndexSort();
+      if (indexSort != null) {
+        msg(infoStream, "    sort=" + indexSort);
+      }
+      segInfoStat.numFiles = info.files().size();
+      segInfoStat.sizeMB = info.sizeInBytes() / (1024. * 1024.);
+      // nf#format is not thread-safe, and would generate random non valid 
results in concurrent
+      // setting
+      synchronized (nf) {
+        msg(infoStream, "    size (MB)=" + nf.format(segInfoStat.sizeMB));
+      }
+      Map<String, String> diagnostics = info.info.getDiagnostics();
+      segInfoStat.diagnostics = diagnostics;
+      if (diagnostics.size() > 0) {
+        msg(infoStream, "    diagnostics = " + diagnostics);
+      }
+
+      if (!info.hasDeletions()) {
+        msg(infoStream, "    no deletions");
+        segInfoStat.hasDeletions = false;
+      } else {
+        msg(infoStream, "    has deletions [delGen=" + info.getDelGen() + "]");
+        segInfoStat.hasDeletions = true;
+        segInfoStat.deletionsGen = info.getDelGen();
+      }
+
+      long startOpenReaderNS = System.nanoTime();
+      if (infoStream != null) infoStream.print("    test: open 
reader.........");
+      reader = new SegmentReader(info, sis.getIndexCreatedVersionMajor(), 
IOContext.DEFAULT);
+      msg(
+          infoStream,
+          String.format(
+              Locale.ROOT, "OK [took %.3f sec]", nsToSec(System.nanoTime() - 
startOpenReaderNS)));
+
+      segInfoStat.openReaderPassed = true;
+
+      long startIntegrityNS = System.nanoTime();
+      if (infoStream != null) infoStream.print("    test: check 
integrity.....");
+      reader.checkIntegrity();
+      msg(
+          infoStream,
+          String.format(
+              Locale.ROOT, "OK [took %.3f sec]", nsToSec(System.nanoTime() - 
startIntegrityNS)));
+
+      if (reader.maxDoc() != info.info.maxDoc()) {
+        throw new CheckIndexException(
+            "SegmentReader.maxDoc() "
+                + reader.maxDoc()
+                + " != SegmentInfo.maxDoc "
+                + info.info.maxDoc());
+      }
+
+      final int numDocs = reader.numDocs();
+      toLoseDocCount = numDocs;

Review comment:
       Here is it `numDocs` (taking deletions into account).

##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -181,6 +193,9 @@
       /** True if we were able to open a CodecReader on this segment. */
       public boolean openReaderPassed;
 
+      /** doc count in this segment */

Review comment:
       Once we decide whether this is `maxDoc` or `docCount` (taking deletions 
into account) let's update this javadoc?

##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -605,209 +680,103 @@ public Status checkIndex(List<String> onlySegments) 
throws IOException {
     result.newSegments.clear();
     result.maxSegmentName = -1;
 
-    for (int i = 0; i < numSegments; i++) {
-      final SegmentCommitInfo info = sis.info(i);
-      long segmentName = Long.parseLong(info.info.name.substring(1), 
Character.MAX_RADIX);
-      if (segmentName > result.maxSegmentName) {
-        result.maxSegmentName = segmentName;
-      }
-      if (onlySegments != null && !onlySegments.contains(info.info.name)) {
-        continue;
-      }
-      Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus();
-      result.segmentInfos.add(segInfoStat);
-      msg(
-          infoStream,
-          "  "
-              + (1 + i)
-              + " of "
-              + numSegments
-              + ": name="
-              + info.info.name
-              + " maxDoc="
-              + info.info.maxDoc());
-      segInfoStat.name = info.info.name;
-      segInfoStat.maxDoc = info.info.maxDoc();
-
-      final Version version = info.info.getVersion();
-      if (info.info.maxDoc() <= 0) {
-        throw new RuntimeException("illegal number of documents: maxDoc=" + 
info.info.maxDoc());
-      }
-
-      int toLoseDocCount = info.info.maxDoc();
-
-      SegmentReader reader = null;
-
-      try {
-        msg(infoStream, "    version=" + (version == null ? "3.0" : version));
-        msg(infoStream, "    id=" + 
StringHelper.idToString(info.info.getId()));
-        final Codec codec = info.info.getCodec();
-        msg(infoStream, "    codec=" + codec);
-        segInfoStat.codec = codec;
-        msg(infoStream, "    compound=" + info.info.getUseCompoundFile());
-        segInfoStat.compound = info.info.getUseCompoundFile();
-        msg(infoStream, "    numFiles=" + info.files().size());
-        Sort indexSort = info.info.getIndexSort();
-        if (indexSort != null) {
-          msg(infoStream, "    sort=" + indexSort);
-        }
-        segInfoStat.numFiles = info.files().size();
-        segInfoStat.sizeMB = info.sizeInBytes() / (1024. * 1024.);
-        msg(infoStream, "    size (MB)=" + nf.format(segInfoStat.sizeMB));
-        Map<String, String> diagnostics = info.info.getDiagnostics();
-        segInfoStat.diagnostics = diagnostics;
-        if (diagnostics.size() > 0) {
-          msg(infoStream, "    diagnostics = " + diagnostics);
+    // checks segments sequentially
+    if (executorService == null) {
+      for (int i = 0; i < numSegments; i++) {
+        final SegmentCommitInfo info = sis.info(i);
+        updateMaxSegmentName(result, info);
+        if (onlySegments != null && !onlySegments.contains(info.info.name)) {
+          continue;
         }
 
-        if (!info.hasDeletions()) {
-          msg(infoStream, "    no deletions");
-          segInfoStat.hasDeletions = false;
-        } else {
-          msg(infoStream, "    has deletions [delGen=" + info.getDelGen() + 
"]");
-          segInfoStat.hasDeletions = true;
-          segInfoStat.deletionsGen = info.getDelGen();
-        }
-
-        long startOpenReaderNS = System.nanoTime();
-        if (infoStream != null) infoStream.print("    test: open 
reader.........");
-        reader = new SegmentReader(info, sis.getIndexCreatedVersionMajor(), 
IOContext.DEFAULT);
         msg(
             infoStream,
-            String.format(
-                Locale.ROOT, "OK [took %.3f sec]", nsToSec(System.nanoTime() - 
startOpenReaderNS)));
+            (1 + i)
+                + " of "
+                + numSegments
+                + ": name="
+                + info.info.name
+                + " maxDoc="
+                + info.info.maxDoc());
+        Status.SegmentInfoStatus segmentInfoStatus = testSegment(sis, info, 
infoStream);
+
+        processSegmentInfoStatusResult(result, info, segmentInfoStatus);
+      }
+    } else {
+      ByteArrayOutputStream[] outputs = new ByteArrayOutputStream[numSegments];
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      CompletableFuture<Status.SegmentInfoStatus>[] futures = new 
CompletableFuture[numSegments];
+
+      // checks segments concurrently
+      for (int i = 0; i < numSegments; i++) {
+        final SegmentCommitInfo info = sis.info(i);
+        updateMaxSegmentName(result, info);
+        if (onlySegments != null && !onlySegments.contains(info.info.name)) {
+          continue;
+        }
 
-        segInfoStat.openReaderPassed = true;
+        SegmentInfos finalSis = sis;
 
-        long startIntegrityNS = System.nanoTime();
-        if (infoStream != null) infoStream.print("    test: check 
integrity.....");
-        reader.checkIntegrity();
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrintStream stream;
+        if (i > 0) {
+          // buffer the messages for segment starting from the 2nd one so that 
they can later be
+          // printed in order
+          stream = new PrintStream(output, true, IOUtils.UTF_8);
+        } else {
+          // optimize for first segment to print real-time

Review comment:
       I think it would be better to 1) buffer all segment's output, and 2) 
print each segment's full output once it is done.  This way the tiny segments 
which finish quickly would produce their output, and the large segments would 
be the long poles, finally finishing and printing theirs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to