s1monw commented on a change in pull request #1397: LUCENE-9304: Refactor 
DWPTPool to pool DWPT directly
URL: https://github.com/apache/lucene-solr/pull/1397#discussion_r404634467
 
 

 ##########
 File path: 
lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
 ##########
 @@ -16,228 +16,173 @@
  */
 package org.apache.lucene.index;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Set;
+import java.util.function.Predicate;
 
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.util.IOSupplier;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
- * {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances
- * and their thread assignments during indexing. Each {@link ThreadState} holds
- * a reference to a {@link DocumentsWriterPerThread} that is once a
- * {@link ThreadState} is obtained from the pool exclusively used for indexing 
a
- * single document by the obtaining thread. Each indexing thread must obtain
- * such a {@link ThreadState} to make progress. Depending on the
- * {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState}
+ * {@link DocumentsWriterPerThreadPool} controls {@link 
DocumentsWriterPerThread} instances
+ * and their thread assignments during indexing. Each {@link 
DocumentsWriterPerThread} is once a
+ * obtained from the pool exclusively used for indexing a
+ * single document or list of documents by the obtaining thread. Each indexing 
thread must obtain
+ * such a {@link DocumentsWriterPerThread} to make progress. Depending on the
+ * {@link DocumentsWriterPerThreadPool} implementation {@link 
DocumentsWriterPerThread}
  * assignments might differ from document to document.
  * <p>
- * Once a {@link DocumentsWriterPerThread} is selected for flush the thread 
pool
- * is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a
- * new {@link DocumentsWriterPerThread} instance.
+ * Once a {@link DocumentsWriterPerThread} is selected for flush the {@link 
DocumentsWriterPerThread} will
+ * be checked out of the thread pool and won't be reused for indexing. See 
{@link #checkout(DocumentsWriterPerThread)}.
  * </p>
  */
-final class DocumentsWriterPerThreadPool {
-  
-  /**
-   * {@link ThreadState} references and guards a
-   * {@link DocumentsWriterPerThread} instance that is used during indexing to
-   * build a in-memory index segment. {@link ThreadState} also holds all flush
-   * related per-thread data controlled by {@link DocumentsWriterFlushControl}.
-   * <p>
-   * A {@link ThreadState}, its methods and members should only accessed by one
-   * thread a time. Users must acquire the lock via {@link ThreadState#lock()}
-   * and release the lock in a finally block via {@link ThreadState#unlock()}
-   * before accessing the state.
-   */
-  @SuppressWarnings("serial")
-  final static class ThreadState extends ReentrantLock {
-    DocumentsWriterPerThread dwpt;
-    // TODO this should really be part of DocumentsWriterFlushControl
-    // write access guarded by DocumentsWriterFlushControl
-    volatile boolean flushPending = false;
-    // TODO this should really be part of DocumentsWriterFlushControl
-    // write access guarded by DocumentsWriterFlushControl
-    long bytesUsed = 0;
-
-    // set by DocumentsWriter after each indexing op finishes
-    volatile long lastSeqNo;
-
-    ThreadState(DocumentsWriterPerThread dpwt) {
-      this.dwpt = dpwt;
-    }
-    
-    private void reset() {
-      assert this.isHeldByCurrentThread();
-      this.dwpt = null;
-      this.bytesUsed = 0;
-      this.flushPending = false;
-    }
-    
-    boolean isInitialized() {
-      assert this.isHeldByCurrentThread();
-      return dwpt != null;
-    }
-    
-    /**
-     * Returns the number of currently active bytes in this ThreadState's
-     * {@link DocumentsWriterPerThread}
-     */
-    public long getBytesUsedPerThread() {
-      assert this.isHeldByCurrentThread();
-      // public for FlushPolicy
-      return bytesUsed;
-    }
-    
-    /**
-     * Returns this {@link ThreadState}s {@link DocumentsWriterPerThread}
-     */
-    public DocumentsWriterPerThread getDocumentsWriterPerThread() {
-      assert this.isHeldByCurrentThread();
-      // public for FlushPolicy
-      return dwpt;
-    }
-    
-    /**
-     * Returns <code>true</code> iff this {@link ThreadState} is marked as 
flush
-     * pending otherwise <code>false</code>
-     */
-    public boolean isFlushPending() {
-      return flushPending;
-    }
-  }
+final class DocumentsWriterPerThreadPool implements 
Iterable<DocumentsWriterPerThread>, Closeable {
 
-  private final List<ThreadState> threadStates = new ArrayList<>();
+  private final Set<DocumentsWriterPerThread> dwpts = 
Collections.newSetFromMap(new IdentityHashMap<>());
+  private final List<DocumentsWriterPerThread> freeList = new ArrayList<>();
+  private final IOSupplier<DocumentsWriterPerThread> dwptFactory;
+  private int takenWriterPermits = 0;
+  private boolean closed;
 
-  private final List<ThreadState> freeList = new ArrayList<>();
 
-  private int takenThreadStatePermits = 0;
+  DocumentsWriterPerThreadPool(IOSupplier<DocumentsWriterPerThread> 
dwptFactory) {
+    this.dwptFactory = dwptFactory;
+  }
 
   /**
-   * Returns the active number of {@link ThreadState} instances.
+   * Returns the active number of {@link DocumentsWriterPerThread} instances.
    */
-  synchronized int getActiveThreadStateCount() {
-    return threadStates.size();
+  synchronized int size() {
+    return dwpts.size();
   }
 
-  synchronized void lockNewThreadStates() {
-    // this is similar to a semaphore - we need to acquire all permits ie. 
takenThreadStatePermits must be == 0
-    // any call to lockNewThreadStates() must be followed by 
unlockNewThreadStates() otherwise we will deadlock at some
+  synchronized void lockNewWriters() {
+    // this is similar to a semaphore - we need to acquire all permits ie. 
takenWriterPermits must be == 0
+    // any call to lockNewWriters() must be followed by unlockNewWriters() 
otherwise we will deadlock at some
     // point
-    assert takenThreadStatePermits >= 0;
-    takenThreadStatePermits++;
+    assert takenWriterPermits >= 0;
+    takenWriterPermits++;
   }
 
-  synchronized void unlockNewThreadStates() {
-    assert takenThreadStatePermits > 0;
-    takenThreadStatePermits--;
-    if (takenThreadStatePermits == 0) {
+  synchronized void unlockNewWriters() {
+    assert takenWriterPermits > 0;
+    takenWriterPermits--;
+    if (takenWriterPermits == 0) {
       notifyAll();
     }
   }
+
   /**
-   * Returns a new {@link ThreadState} iff any new state is available otherwise
-   * <code>null</code>.
-   * <p>
-   * NOTE: the returned {@link ThreadState} is already locked iff non-
-   * <code>null</code>.
-   * 
-   * @return a new {@link ThreadState} iff any new state is available otherwise
-   *         <code>null</code>
+   * Returns a new already locked {@link DocumentsWriterPerThread}
+   *
+   * @return a new {@link DocumentsWriterPerThread}
    */
-  private synchronized ThreadState newThreadState() {
-    assert takenThreadStatePermits >= 0;
-    while (takenThreadStatePermits > 0) {
-      // we can't create new thread-states while not all permits are available
+  private synchronized DocumentsWriterPerThread newWriter() throws IOException 
{
+    assert takenWriterPermits >= 0;
+    while (takenWriterPermits > 0) {
+      // we can't create new DWPTs while not all permits are available
       try {
         wait();
       } catch (InterruptedException ie) {
         throw new ThreadInterruptedException(ie);
       }
     }
-    ThreadState threadState = new ThreadState(null);
-    threadState.lock(); // lock so nobody else will get this ThreadState
-    threadStates.add(threadState);
-    return threadState;
-}
-
-  DocumentsWriterPerThread reset(ThreadState threadState) {
-    assert threadState.isHeldByCurrentThread();
-    final DocumentsWriterPerThread dwpt = threadState.dwpt;
-    threadState.reset();
+    DocumentsWriterPerThread dwpt = dwptFactory.get();
+    dwpt.lock(); // lock so nobody else will get this DWPT
+    dwpts.add(dwpt);
     return dwpt;
   }
-  
-  void recycle(DocumentsWriterPerThread dwpt) {
-    // don't recycle DWPT by default
-  }
 
   // TODO: maybe we should try to do load leveling here: we want roughly even 
numbers
   // of items (docs, deletes, DV updates) to most take advantage of 
concurrency while flushing
 
-  /** This method is used by DocumentsWriter/FlushControl to obtain a 
ThreadState to do an indexing operation (add/updateDocument). */
-  ThreadState getAndLock() {
-    ThreadState threadState = null;
+  /** This method is used by DocumentsWriter/FlushControl to obtain a DWPT to 
do an indexing operation (add/updateDocument). */
+  DocumentsWriterPerThread getAndLock() throws IOException {
     synchronized (this) {
-      if (freeList.isEmpty()) {
-        // ThreadState is already locked before return by this method:
-        return newThreadState();
-      } else {
-        // Important that we are LIFO here! This way if number of concurrent 
indexing threads was once high, but has now reduced, we only use a
-        // limited number of thread states:
-        threadState = freeList.remove(freeList.size()-1);
-
-        if (threadState.dwpt == null) {
-          // This thread-state is not initialized, e.g. it
-          // was just flushed. See if we can instead find
-          // another free thread state that already has docs
-          // indexed. This way if incoming thread concurrency
-          // has decreased, we don't leave docs
-          // indefinitely buffered, tying up RAM.  This
-          // will instead get those thread states flushed,
-          // freeing up RAM for larger segment flushes:
-          for(int i=0;i<freeList.size();i++) {
-            ThreadState ts = freeList.get(i);
-            if (ts.dwpt != null) {
-              // Use this one instead, and swap it with
-              // the un-initialized one:
-              freeList.set(i, threadState);
-              threadState = ts;
-              break;
-            }
-          }
+      if (closed) {
+        throw new AlreadyClosedException("DWPTPool is already closed");
+      }
+      // Important that we are LIFO here! This way if number of concurrent 
indexing threads was once high,
+      // but has now reduced, we only use a limited number of DWPTs. This also 
guarantees that if we have suddenly
+      // a single thread indexing
+      for (int i = freeList.size()-1; i >= 0; i--) {
 
 Review comment:
   👍 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to