s1monw commented on a change in pull request #1397: LUCENE-9304: Refactor DWPTPool to pool DWPT directly URL: https://github.com/apache/lucene-solr/pull/1397#discussion_r404634467
########## File path: lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java ########## @@ -16,228 +16,173 @@ */ package org.apache.lucene.index; +import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Iterator; import java.util.List; -import java.util.concurrent.locks.ReentrantLock; +import java.util.Set; +import java.util.function.Predicate; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.IOSupplier; import org.apache.lucene.util.ThreadInterruptedException; /** - * {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances - * and their thread assignments during indexing. Each {@link ThreadState} holds - * a reference to a {@link DocumentsWriterPerThread} that is once a - * {@link ThreadState} is obtained from the pool exclusively used for indexing a - * single document by the obtaining thread. Each indexing thread must obtain - * such a {@link ThreadState} to make progress. Depending on the - * {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState} + * {@link DocumentsWriterPerThreadPool} controls {@link DocumentsWriterPerThread} instances + * and their thread assignments during indexing. Each {@link DocumentsWriterPerThread} is once a + * obtained from the pool exclusively used for indexing a + * single document or list of documents by the obtaining thread. Each indexing thread must obtain + * such a {@link DocumentsWriterPerThread} to make progress. Depending on the + * {@link DocumentsWriterPerThreadPool} implementation {@link DocumentsWriterPerThread} * assignments might differ from document to document. * <p> - * Once a {@link DocumentsWriterPerThread} is selected for flush the thread pool - * is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a - * new {@link DocumentsWriterPerThread} instance. + * Once a {@link DocumentsWriterPerThread} is selected for flush the {@link DocumentsWriterPerThread} will + * be checked out of the thread pool and won't be reused for indexing. See {@link #checkout(DocumentsWriterPerThread)}. * </p> */ -final class DocumentsWriterPerThreadPool { - - /** - * {@link ThreadState} references and guards a - * {@link DocumentsWriterPerThread} instance that is used during indexing to - * build a in-memory index segment. {@link ThreadState} also holds all flush - * related per-thread data controlled by {@link DocumentsWriterFlushControl}. - * <p> - * A {@link ThreadState}, its methods and members should only accessed by one - * thread a time. Users must acquire the lock via {@link ThreadState#lock()} - * and release the lock in a finally block via {@link ThreadState#unlock()} - * before accessing the state. - */ - @SuppressWarnings("serial") - final static class ThreadState extends ReentrantLock { - DocumentsWriterPerThread dwpt; - // TODO this should really be part of DocumentsWriterFlushControl - // write access guarded by DocumentsWriterFlushControl - volatile boolean flushPending = false; - // TODO this should really be part of DocumentsWriterFlushControl - // write access guarded by DocumentsWriterFlushControl - long bytesUsed = 0; - - // set by DocumentsWriter after each indexing op finishes - volatile long lastSeqNo; - - ThreadState(DocumentsWriterPerThread dpwt) { - this.dwpt = dpwt; - } - - private void reset() { - assert this.isHeldByCurrentThread(); - this.dwpt = null; - this.bytesUsed = 0; - this.flushPending = false; - } - - boolean isInitialized() { - assert this.isHeldByCurrentThread(); - return dwpt != null; - } - - /** - * Returns the number of currently active bytes in this ThreadState's - * {@link DocumentsWriterPerThread} - */ - public long getBytesUsedPerThread() { - assert this.isHeldByCurrentThread(); - // public for FlushPolicy - return bytesUsed; - } - - /** - * Returns this {@link ThreadState}s {@link DocumentsWriterPerThread} - */ - public DocumentsWriterPerThread getDocumentsWriterPerThread() { - assert this.isHeldByCurrentThread(); - // public for FlushPolicy - return dwpt; - } - - /** - * Returns <code>true</code> iff this {@link ThreadState} is marked as flush - * pending otherwise <code>false</code> - */ - public boolean isFlushPending() { - return flushPending; - } - } +final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerThread>, Closeable { - private final List<ThreadState> threadStates = new ArrayList<>(); + private final Set<DocumentsWriterPerThread> dwpts = Collections.newSetFromMap(new IdentityHashMap<>()); + private final List<DocumentsWriterPerThread> freeList = new ArrayList<>(); + private final IOSupplier<DocumentsWriterPerThread> dwptFactory; + private int takenWriterPermits = 0; + private boolean closed; - private final List<ThreadState> freeList = new ArrayList<>(); - private int takenThreadStatePermits = 0; + DocumentsWriterPerThreadPool(IOSupplier<DocumentsWriterPerThread> dwptFactory) { + this.dwptFactory = dwptFactory; + } /** - * Returns the active number of {@link ThreadState} instances. + * Returns the active number of {@link DocumentsWriterPerThread} instances. */ - synchronized int getActiveThreadStateCount() { - return threadStates.size(); + synchronized int size() { + return dwpts.size(); } - synchronized void lockNewThreadStates() { - // this is similar to a semaphore - we need to acquire all permits ie. takenThreadStatePermits must be == 0 - // any call to lockNewThreadStates() must be followed by unlockNewThreadStates() otherwise we will deadlock at some + synchronized void lockNewWriters() { + // this is similar to a semaphore - we need to acquire all permits ie. takenWriterPermits must be == 0 + // any call to lockNewWriters() must be followed by unlockNewWriters() otherwise we will deadlock at some // point - assert takenThreadStatePermits >= 0; - takenThreadStatePermits++; + assert takenWriterPermits >= 0; + takenWriterPermits++; } - synchronized void unlockNewThreadStates() { - assert takenThreadStatePermits > 0; - takenThreadStatePermits--; - if (takenThreadStatePermits == 0) { + synchronized void unlockNewWriters() { + assert takenWriterPermits > 0; + takenWriterPermits--; + if (takenWriterPermits == 0) { notifyAll(); } } + /** - * Returns a new {@link ThreadState} iff any new state is available otherwise - * <code>null</code>. - * <p> - * NOTE: the returned {@link ThreadState} is already locked iff non- - * <code>null</code>. - * - * @return a new {@link ThreadState} iff any new state is available otherwise - * <code>null</code> + * Returns a new already locked {@link DocumentsWriterPerThread} + * + * @return a new {@link DocumentsWriterPerThread} */ - private synchronized ThreadState newThreadState() { - assert takenThreadStatePermits >= 0; - while (takenThreadStatePermits > 0) { - // we can't create new thread-states while not all permits are available + private synchronized DocumentsWriterPerThread newWriter() throws IOException { + assert takenWriterPermits >= 0; + while (takenWriterPermits > 0) { + // we can't create new DWPTs while not all permits are available try { wait(); } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } } - ThreadState threadState = new ThreadState(null); - threadState.lock(); // lock so nobody else will get this ThreadState - threadStates.add(threadState); - return threadState; -} - - DocumentsWriterPerThread reset(ThreadState threadState) { - assert threadState.isHeldByCurrentThread(); - final DocumentsWriterPerThread dwpt = threadState.dwpt; - threadState.reset(); + DocumentsWriterPerThread dwpt = dwptFactory.get(); + dwpt.lock(); // lock so nobody else will get this DWPT + dwpts.add(dwpt); return dwpt; } - - void recycle(DocumentsWriterPerThread dwpt) { - // don't recycle DWPT by default - } // TODO: maybe we should try to do load leveling here: we want roughly even numbers // of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing - /** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */ - ThreadState getAndLock() { - ThreadState threadState = null; + /** This method is used by DocumentsWriter/FlushControl to obtain a DWPT to do an indexing operation (add/updateDocument). */ + DocumentsWriterPerThread getAndLock() throws IOException { synchronized (this) { - if (freeList.isEmpty()) { - // ThreadState is already locked before return by this method: - return newThreadState(); - } else { - // Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a - // limited number of thread states: - threadState = freeList.remove(freeList.size()-1); - - if (threadState.dwpt == null) { - // This thread-state is not initialized, e.g. it - // was just flushed. See if we can instead find - // another free thread state that already has docs - // indexed. This way if incoming thread concurrency - // has decreased, we don't leave docs - // indefinitely buffered, tying up RAM. This - // will instead get those thread states flushed, - // freeing up RAM for larger segment flushes: - for(int i=0;i<freeList.size();i++) { - ThreadState ts = freeList.get(i); - if (ts.dwpt != null) { - // Use this one instead, and swap it with - // the un-initialized one: - freeList.set(i, threadState); - threadState = ts; - break; - } - } + if (closed) { + throw new AlreadyClosedException("DWPTPool is already closed"); + } + // Important that we are LIFO here! This way if number of concurrent indexing threads was once high, + // but has now reduced, we only use a limited number of DWPTs. This also guarantees that if we have suddenly + // a single thread indexing + for (int i = freeList.size()-1; i >= 0; i--) { Review comment: 👍 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org