klsince commented on code in PR #11692:
URL: https://github.com/apache/pinot/pull/11692#discussion_r1338041774


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -702,29 +674,50 @@ public void removeExpiredPrimaryKeys() {
    */
   protected abstract void doRemoveExpiredPrimaryKeys();
 
+  protected synchronized boolean startOperation() {
+    if (_stopped || _numPendingOperations == 0) {
+      return false;
+    }
+    _numPendingOperations++;
+    return true;
+  }
+
+  protected synchronized void finishOperation() {
+    _numPendingOperations--;
+    if (_numPendingOperations == 0) {
+      notifyAll();
+    }
+  }
+
   @Override
-  public void stop() {
+  public synchronized void stop() {
+    if (_stopped) {
+      _logger.warn("Metadata manager is already stopped");
+      return;
+    }
     _stopped = true;
-    int numPendingOperations = _numPendingOperations.decrementAndGet();
+    _numPendingOperations--;

Review Comment:
   I believe stop() and close() is called by single thread one after another, 
otherwise, we may get this race condition: thread1 called close(), then thread2 
called stop(), because we don't call notifyAll() in stop() after 
`_numPendingOperations` becomes 0, the thread1 would wait forever.
   
   to be safe, we can this in stop() method as well:
   ```
    if (_numPendingOperations == 0) {
         notifyAll();
       }
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -702,29 +674,50 @@ public void removeExpiredPrimaryKeys() {
    */
   protected abstract void doRemoveExpiredPrimaryKeys();
 
+  protected synchronized boolean startOperation() {
+    if (_stopped || _numPendingOperations == 0) {
+      return false;
+    }
+    _numPendingOperations++;
+    return true;
+  }
+
+  protected synchronized void finishOperation() {
+    _numPendingOperations--;
+    if (_numPendingOperations == 0) {
+      notifyAll();
+    }
+  }
+
   @Override
-  public void stop() {
+  public synchronized void stop() {
+    if (_stopped) {
+      _logger.warn("Metadata manager is already stopped");
+      return;
+    }
     _stopped = true;
-    int numPendingOperations = _numPendingOperations.decrementAndGet();
+    _numPendingOperations--;
     _logger.info("Stopped the metadata manager with {} pending operations, 
current primary key count: {}",
-        numPendingOperations, getNumPrimaryKeys());
+        _numPendingOperations, getNumPrimaryKeys());
   }
 
   @Override
-  public void close()
+  public synchronized void close()
       throws IOException {
     Preconditions.checkState(_stopped, "Must stop the metadata manager before 
closing it");
+    if (_closed) {
+      _logger.warn("Metadata manager is already closed");
+      return;
+    }
+    _closed = true;
     _logger.info("Closing the metadata manager");
-    synchronized (_numPendingOperations) {
-      int numPendingOperations;
-      while ((numPendingOperations = _numPendingOperations.get()) != 0) {
-        _logger.info("Waiting for {} pending operations to finish", 
numPendingOperations);
-        try {
-          _numPendingOperations.wait();
-        } catch (InterruptedException e) {
-          throw new RuntimeException(
-              String.format("Interrupted while waiting for %d pending 
operations to finish", numPendingOperations), e);
-        }
+    while (_numPendingOperations != 0) {
+      _logger.info("Waiting for {} pending operations to finish", 
_numPendingOperations);
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(
+            String.format("Interrupted while waiting for %d pending operations 
to finish", _numPendingOperations), e);
       }
     }
     doClose();

Review Comment:
   doClose() is now inside the critical section. any concern about this? would 
it call any sync'ed methods on this object, leading to potential deadlock?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -80,17 +79,18 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected volatile boolean _gotFirstConsumingSegment = false;
   protected final ReadWriteLock _snapshotLock;
 
-  protected volatile boolean _stopped = false;
-  // Initialize with 1 pending operation to indicate the metadata manager can 
take more operations
-  protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
-
   protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
   protected int _numOutOfOrderEvents = 0;
 
   // Used to maintain the largestSeenComparisonValue to avoid handling 
out-of-ttl segments/records.
   // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to 
compute expired segments.
   protected volatile double _largestSeenComparisonValue;
 
+  private boolean _stopped;

Review Comment:
   add a comment that those 3 object states are sync'ed with object's intrinsic 
lock and condition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to