This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 82de30dd89 Fix the potential access to upsert metadata manager after 
it is closed (#11692)
82de30dd89 is described below

commit 82de30dd894a8495ded8a4ddbe240846241a5f3b
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Sep 27 09:16:23 2023 -0700

    Fix the potential access to upsert metadata manager after it is closed 
(#11692)
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 142 ++++++++++-----------
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  58 ++++++++-
 2 files changed, 122 insertions(+), 78 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index e968421537..c286bad125 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -80,10 +79,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected volatile boolean _gotFirstConsumingSegment = false;
   protected final ReadWriteLock _snapshotLock;
 
-  protected volatile boolean _stopped = false;
-  // Initialize with 1 pending operation to indicate the metadata manager can 
take more operations
-  protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
-
   protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
   protected int _numOutOfOrderEvents = 0;
 
@@ -91,6 +86,12 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to 
compute expired segments.
   protected volatile double _largestSeenComparisonValue;
 
+  // The following variables are always accessed within synchronized block
+  private boolean _stopped;
+  // Initialize with 1 pending operation to indicate the metadata manager can 
take more operations
+  private int _numPendingOperations = 1;
+  private boolean _closed;
+
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, 
@Nullable String deleteRecordColumn,
       HashFunction hashFunction, @Nullable PartialUpsertHandler 
partialUpsertHandler, boolean enableSnapshot,
@@ -123,10 +124,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   @Override
   public void addSegment(ImmutableSegment segment) {
     String segmentName = segment.getSegmentName();
-    if (_stopped) {
-      _logger.info("Skip adding segment: {} because metadata manager is 
already stopped", segment.getSegmentName());
-      return;
-    }
     if (segment instanceof EmptyIndexSegment) {
       _logger.info("Skip adding empty segment: {}", segmentName);
       return;
@@ -158,18 +155,21 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       }
     }
 
+    if (!startOperation()) {
+      _logger.info("Skip adding segment: {} because metadata manager is 
already stopped", segment.getSegmentName());
+      return;
+    }
     if (_enableSnapshot) {
       _snapshotLock.readLock().lock();
     }
-    startOperation();
     try {
       doAddSegment(immutableSegment);
       _trackedSegments.add(segment);
     } finally {
-      finishOperation();
       if (_enableSnapshot) {
         _snapshotLock.readLock().unlock();
       }
+      finishOperation();
     }
   }
 
@@ -219,24 +219,23 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   @Override
   public void preloadSegment(ImmutableSegment segment) {
     String segmentName = segment.getSegmentName();
-    if (_stopped) {
-      _logger.info("Skip preloading segment: {} because metadata manager is 
already stopped", segmentName);
-      return;
-    }
     Preconditions.checkArgument(_enableSnapshot, "Snapshot must be enabled to 
preload segment: {}, table: {}",
         segmentName, _tableNameWithType);
     // Note that EmptyIndexSegment should not reach here either, as it doesn't 
have validDocIds snapshot.
     Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
         "Got unsupported segment implementation: {} for segment: {}, table: 
{}", segment.getClass(), segmentName,
         _tableNameWithType);
+    if (!startOperation()) {
+      _logger.info("Skip preloading segment: {} because metadata manager is 
already stopped", segmentName);
+      return;
+    }
     _snapshotLock.readLock().lock();
-    startOperation();
     try {
       doPreloadSegment((ImmutableSegmentImpl) segment);
       _trackedSegments.add(segment);
     } finally {
-      finishOperation();
       _snapshotLock.readLock().unlock();
+      finishOperation();
     }
   }
 
@@ -319,16 +318,14 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
   @Override
   public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
-    if (_stopped) {
+    _gotFirstConsumingSegment = true;
+    if (!startOperation()) {
       _logger.debug("Skip adding record to segment: {} because metadata 
manager is already stopped",
           segment.getSegmentName());
       return;
     }
-
     // NOTE: We don't acquire snapshot read lock here because snapshot is 
always taken before a new consuming segment
     //       starts consuming, so it won't overlap with this method
-    _gotFirstConsumingSegment = true;
-    startOperation();
     try {
       doAddRecord(segment, recordInfo);
       _trackedSegments.add(segment);
@@ -341,15 +338,13 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
   @Override
   public void replaceSegment(ImmutableSegment segment, IndexSegment 
oldSegment) {
-    if (_stopped) {
+    if (!startOperation()) {
       _logger.info("Skip replacing segment: {} because metadata manager is 
already stopped", segment.getSegmentName());
       return;
     }
-
     if (_enableSnapshot) {
       _snapshotLock.readLock().lock();
     }
-    startOperation();
     try {
       doReplaceSegment(segment, oldSegment);
       if (!(segment instanceof EmptyIndexSegment)) {
@@ -357,10 +352,10 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       }
       _trackedSegments.remove(oldSegment);
     } finally {
-      finishOperation();
       if (_enableSnapshot) {
         _snapshotLock.readLock().unlock();
       }
+      finishOperation();
     }
   }
 
@@ -459,15 +454,10 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   @Override
   public void removeSegment(IndexSegment segment) {
     String segmentName = segment.getSegmentName();
-    if (_stopped) {
-      _logger.info("Skip removing segment: {} because metadata manager is 
already stopped", segmentName);
-      return;
-    }
     if (!_trackedSegments.contains(segment)) {
       _logger.info("Skip removing untracked (replaced or empty) segment: {}", 
segmentName);
       return;
     }
-
     // Skip removing segment that has max comparison value smaller than 
(largestSeenComparisonValue - TTL)
     if (_largestSeenComparisonValue > 0) {
       Number maxComparisonValue =
@@ -477,19 +467,21 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         return;
       }
     }
-
+    if (!startOperation()) {
+      _logger.info("Skip removing segment: {} because metadata manager is 
already stopped", segmentName);
+      return;
+    }
     if (_enableSnapshot) {
       _snapshotLock.readLock().lock();
     }
-    startOperation();
     try {
       doRemoveSegment(segment);
       _trackedSegments.remove(segment);
     } finally {
-      finishOperation();
       if (_enableSnapshot) {
         _snapshotLock.readLock().unlock();
       }
+      finishOperation();
     }
   }
 
@@ -530,12 +522,10 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     if (_partialUpsertHandler == null) {
       return record;
     }
-    if (_stopped) {
+    if (!startOperation()) {
       _logger.debug("Skip updating record because metadata manager is already 
stopped");
       return record;
     }
-
-    startOperation();
     try {
       return doUpdateRecord(record, recordInfo);
     } finally {
@@ -566,22 +556,20 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     if (!_enableSnapshot) {
       return;
     }
-    if (_stopped) {
-      _logger.info("Skip taking snapshot because metadata manager is already 
stopped");
-      return;
-    }
     if (!_gotFirstConsumingSegment) {
       _logger.info("Skip taking snapshot before getting the first consuming 
segment");
       return;
     }
-
+    if (!startOperation()) {
+      _logger.info("Skip taking snapshot because metadata manager is already 
stopped");
+      return;
+    }
     _snapshotLock.writeLock().lock();
-    startOperation();
     try {
       doTakeSnapshot();
     } finally {
-      finishOperation();
       _snapshotLock.writeLock().unlock();
+      finishOperation();
     }
   }
 
@@ -597,8 +585,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       if (segment instanceof ImmutableSegmentImpl) {
         ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot();
         numImmutableSegments++;
-        numPrimaryKeysInSnapshot +=
-            ((ImmutableSegmentImpl) 
segment).getValidDocIds().getMutableRoaringBitmap().getCardinality();
+        numPrimaryKeysInSnapshot += 
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
       }
     }
 
@@ -667,29 +654,15 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION 
+ _partitionId);
   }
 
-  protected void startOperation() {
-    _numPendingOperations.getAndIncrement();
-  }
-
-  protected void finishOperation() {
-    if (_numPendingOperations.decrementAndGet() == 0) {
-      synchronized (_numPendingOperations) {
-        _numPendingOperations.notifyAll();
-      }
-    }
-  }
-
   @Override
   public void removeExpiredPrimaryKeys() {
     if (_metadataTTL <= 0) {
       return;
     }
-    if (_stopped) {
+    if (!startOperation()) {
       _logger.info("Skip removing expired primary keys because metadata 
manager is already stopped");
       return;
     }
-
-    startOperation();
     try {
       doRemoveExpiredPrimaryKeys();
     } finally {
@@ -702,29 +675,50 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
    */
   protected abstract void doRemoveExpiredPrimaryKeys();
 
+  protected synchronized boolean startOperation() {
+    if (_stopped || _numPendingOperations == 0) {
+      return false;
+    }
+    _numPendingOperations++;
+    return true;
+  }
+
+  protected synchronized void finishOperation() {
+    _numPendingOperations--;
+    if (_numPendingOperations == 0) {
+      notifyAll();
+    }
+  }
+
   @Override
-  public void stop() {
+  public synchronized void stop() {
+    if (_stopped) {
+      _logger.warn("Metadata manager is already stopped");
+      return;
+    }
     _stopped = true;
-    int numPendingOperations = _numPendingOperations.decrementAndGet();
+    _numPendingOperations--;
     _logger.info("Stopped the metadata manager with {} pending operations, 
current primary key count: {}",
-        numPendingOperations, getNumPrimaryKeys());
+        _numPendingOperations, getNumPrimaryKeys());
   }
 
   @Override
-  public void close()
+  public synchronized void close()
       throws IOException {
     Preconditions.checkState(_stopped, "Must stop the metadata manager before 
closing it");
+    if (_closed) {
+      _logger.warn("Metadata manager is already closed");
+      return;
+    }
+    _closed = true;
     _logger.info("Closing the metadata manager");
-    synchronized (_numPendingOperations) {
-      int numPendingOperations;
-      while ((numPendingOperations = _numPendingOperations.get()) != 0) {
-        _logger.info("Waiting for {} pending operations to finish", 
numPendingOperations);
-        try {
-          _numPendingOperations.wait();
-        } catch (InterruptedException e) {
-          throw new RuntimeException(
-              String.format("Interrupted while waiting for %d pending 
operations to finish", numPendingOperations), e);
-        }
+    while (_numPendingOperations != 0) {
+      _logger.info("Waiting for {} pending operations to finish", 
_numPendingOperations);
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(
+            String.format("Interrupted while waiting for %d pending operations 
to finish", _numPendingOperations), e);
       }
     }
     doClose();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 363ed18e13..3a1904300b 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -27,6 +27,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -49,6 +52,7 @@ import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -59,10 +63,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertSame;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
 
 public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@@ -83,6 +84,55 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest 
{
     FileUtils.forceDelete(INDEX_DIR);
   }
 
+  @Test
+  public void testStartFinishOperation() {
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, false, 0, INDEX_DIR,
+            mock(ServerMetrics.class));
+
+    // Start 2 operations
+    assertTrue(upsertMetadataManager.startOperation());
+    assertTrue(upsertMetadataManager.startOperation());
+
+    // Stop and close the metadata manager
+    AtomicBoolean stopped = new AtomicBoolean();
+    AtomicBoolean closed = new AtomicBoolean();
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    executor.submit(() -> {
+      upsertMetadataManager.stop();
+      stopped.set(true);
+      try {
+        upsertMetadataManager.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      closed.set(true);
+    });
+    executor.shutdown();
+
+    // Wait for metadata manager to be stopped
+    TestUtils.waitForCondition(aVoid -> stopped.get(), 10_000L, "Failed to 
stop the metadata manager");
+
+    // Metadata manager should block on close because there are 2 pending 
operations
+    assertFalse(closed.get());
+
+    // Starting new operation should fail because the metadata manager is 
already stopped
+    assertFalse(upsertMetadataManager.startOperation());
+
+    // Finish one operation
+    upsertMetadataManager.finishOperation();
+
+    // Metadata manager should still block on close because there is still 1 
pending operation
+    assertFalse(closed.get());
+
+    // Finish the other operation
+    upsertMetadataManager.finishOperation();
+
+    // Metadata manager should be closed now
+    TestUtils.waitForCondition(aVoid -> closed.get(), 10_000L, "Failed to 
close the metadata manager");
+  }
+
   @Test
   public void testAddReplaceRemoveSegment()
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to