Jackie-Jiang commented on code in PR #11692: URL: https://github.com/apache/pinot/pull/11692#discussion_r1338053225
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -702,29 +674,50 @@ public void removeExpiredPrimaryKeys() { */ protected abstract void doRemoveExpiredPrimaryKeys(); + protected synchronized boolean startOperation() { + if (_stopped || _numPendingOperations == 0) { + return false; + } + _numPendingOperations++; + return true; + } + + protected synchronized void finishOperation() { + _numPendingOperations--; + if (_numPendingOperations == 0) { + notifyAll(); + } + } + @Override - public void stop() { + public synchronized void stop() { + if (_stopped) { + _logger.warn("Metadata manager is already stopped"); + return; + } _stopped = true; - int numPendingOperations = _numPendingOperations.decrementAndGet(); + _numPendingOperations--; _logger.info("Stopped the metadata manager with {} pending operations, current primary key count: {}", - numPendingOperations, getNumPrimaryKeys()); + _numPendingOperations, getNumPrimaryKeys()); } @Override - public void close() + public synchronized void close() throws IOException { Preconditions.checkState(_stopped, "Must stop the metadata manager before closing it"); + if (_closed) { + _logger.warn("Metadata manager is already closed"); + return; + } + _closed = true; _logger.info("Closing the metadata manager"); - synchronized (_numPendingOperations) { - int numPendingOperations; - while ((numPendingOperations = _numPendingOperations.get()) != 0) { - _logger.info("Waiting for {} pending operations to finish", numPendingOperations); - try { - _numPendingOperations.wait(); - } catch (InterruptedException e) { - throw new RuntimeException( - String.format("Interrupted while waiting for %d pending operations to finish", numPendingOperations), e); - } + while (_numPendingOperations != 0) { + _logger.info("Waiting for {} pending operations to finish", _numPendingOperations); + try { + wait(); + } catch (InterruptedException e) { + throw new RuntimeException( + String.format("Interrupted while waiting for %d pending operations to finish", _numPendingOperations), e); } } doClose(); Review Comment: `synchronized` is reentrant lock so it should never deadlock on itself. We don't want these method to be called in parallel by multiple threads, so `synchronized` should be the right way to achieve that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org