Jackie-Jiang commented on code in PR #11692:
URL: https://github.com/apache/pinot/pull/11692#discussion_r1338053225


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -702,29 +674,50 @@ public void removeExpiredPrimaryKeys() {
    */
   protected abstract void doRemoveExpiredPrimaryKeys();
 
+  protected synchronized boolean startOperation() {
+    if (_stopped || _numPendingOperations == 0) {
+      return false;
+    }
+    _numPendingOperations++;
+    return true;
+  }
+
+  protected synchronized void finishOperation() {
+    _numPendingOperations--;
+    if (_numPendingOperations == 0) {
+      notifyAll();
+    }
+  }
+
   @Override
-  public void stop() {
+  public synchronized void stop() {
+    if (_stopped) {
+      _logger.warn("Metadata manager is already stopped");
+      return;
+    }
     _stopped = true;
-    int numPendingOperations = _numPendingOperations.decrementAndGet();
+    _numPendingOperations--;
     _logger.info("Stopped the metadata manager with {} pending operations, 
current primary key count: {}",
-        numPendingOperations, getNumPrimaryKeys());
+        _numPendingOperations, getNumPrimaryKeys());
   }
 
   @Override
-  public void close()
+  public synchronized void close()
       throws IOException {
     Preconditions.checkState(_stopped, "Must stop the metadata manager before 
closing it");
+    if (_closed) {
+      _logger.warn("Metadata manager is already closed");
+      return;
+    }
+    _closed = true;
     _logger.info("Closing the metadata manager");
-    synchronized (_numPendingOperations) {
-      int numPendingOperations;
-      while ((numPendingOperations = _numPendingOperations.get()) != 0) {
-        _logger.info("Waiting for {} pending operations to finish", 
numPendingOperations);
-        try {
-          _numPendingOperations.wait();
-        } catch (InterruptedException e) {
-          throw new RuntimeException(
-              String.format("Interrupted while waiting for %d pending 
operations to finish", numPendingOperations), e);
-        }
+    while (_numPendingOperations != 0) {
+      _logger.info("Waiting for {} pending operations to finish", 
_numPendingOperations);
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(
+            String.format("Interrupted while waiting for %d pending operations 
to finish", _numPendingOperations), e);
       }
     }
     doClose();

Review Comment:
   `synchronized` is reentrant lock so it should never deadlock on itself. We 
don't want these method to be called in parallel by multiple threads, so 
`synchronized` should be the right way to achieve that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to