This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 82de30dd89 Fix the potential access to upsert metadata manager after it is closed (#11692) 82de30dd89 is described below commit 82de30dd894a8495ded8a4ddbe240846241a5f3b Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Sep 27 09:16:23 2023 -0700 Fix the potential access to upsert metadata manager after it is closed (#11692) --- .../upsert/BasePartitionUpsertMetadataManager.java | 142 ++++++++++----------- ...rrentMapPartitionUpsertMetadataManagerTest.java | 58 ++++++++- 2 files changed, 122 insertions(+), 78 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index e968421537..c286bad125 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -80,10 +79,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps protected volatile boolean _gotFirstConsumingSegment = false; protected final ReadWriteLock _snapshotLock; - protected volatile boolean _stopped = false; - // Initialize with 1 pending operation to indicate the metadata manager can take more operations - protected final AtomicInteger _numPendingOperations = new AtomicInteger(1); - protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE; protected int _numOutOfOrderEvents = 0; @@ -91,6 +86,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to compute expired segments. protected volatile double _largestSeenComparisonValue; + // The following variables are always accessed within synchronized block + private boolean _stopped; + // Initialize with 1 pending operation to indicate the metadata manager can take more operations + private int _numPendingOperations = 1; + private boolean _closed; + protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, @@ -123,10 +124,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @Override public void addSegment(ImmutableSegment segment) { String segmentName = segment.getSegmentName(); - if (_stopped) { - _logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName()); - return; - } if (segment instanceof EmptyIndexSegment) { _logger.info("Skip adding empty segment: {}", segmentName); return; @@ -158,18 +155,21 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } } + if (!startOperation()) { + _logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName()); + return; + } if (_enableSnapshot) { _snapshotLock.readLock().lock(); } - startOperation(); try { doAddSegment(immutableSegment); _trackedSegments.add(segment); } finally { - finishOperation(); if (_enableSnapshot) { _snapshotLock.readLock().unlock(); } + finishOperation(); } } @@ -219,24 +219,23 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @Override public void preloadSegment(ImmutableSegment segment) { String segmentName = segment.getSegmentName(); - if (_stopped) { - _logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName); - return; - } Preconditions.checkArgument(_enableSnapshot, "Snapshot must be enabled to preload segment: {}, table: {}", segmentName, _tableNameWithType); // Note that EmptyIndexSegment should not reach here either, as it doesn't have validDocIds snapshot. Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName, _tableNameWithType); + if (!startOperation()) { + _logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName); + return; + } _snapshotLock.readLock().lock(); - startOperation(); try { doPreloadSegment((ImmutableSegmentImpl) segment); _trackedSegments.add(segment); } finally { - finishOperation(); _snapshotLock.readLock().unlock(); + finishOperation(); } } @@ -319,16 +318,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @Override public void addRecord(MutableSegment segment, RecordInfo recordInfo) { - if (_stopped) { + _gotFirstConsumingSegment = true; + if (!startOperation()) { _logger.debug("Skip adding record to segment: {} because metadata manager is already stopped", segment.getSegmentName()); return; } - // NOTE: We don't acquire snapshot read lock here because snapshot is always taken before a new consuming segment // starts consuming, so it won't overlap with this method - _gotFirstConsumingSegment = true; - startOperation(); try { doAddRecord(segment, recordInfo); _trackedSegments.add(segment); @@ -341,15 +338,13 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @Override public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) { - if (_stopped) { + if (!startOperation()) { _logger.info("Skip replacing segment: {} because metadata manager is already stopped", segment.getSegmentName()); return; } - if (_enableSnapshot) { _snapshotLock.readLock().lock(); } - startOperation(); try { doReplaceSegment(segment, oldSegment); if (!(segment instanceof EmptyIndexSegment)) { @@ -357,10 +352,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } _trackedSegments.remove(oldSegment); } finally { - finishOperation(); if (_enableSnapshot) { _snapshotLock.readLock().unlock(); } + finishOperation(); } } @@ -459,15 +454,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @Override public void removeSegment(IndexSegment segment) { String segmentName = segment.getSegmentName(); - if (_stopped) { - _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName); - return; - } if (!_trackedSegments.contains(segment)) { _logger.info("Skip removing untracked (replaced or empty) segment: {}", segmentName); return; } - // Skip removing segment that has max comparison value smaller than (largestSeenComparisonValue - TTL) if (_largestSeenComparisonValue > 0) { Number maxComparisonValue = @@ -477,19 +467,21 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps return; } } - + if (!startOperation()) { + _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName); + return; + } if (_enableSnapshot) { _snapshotLock.readLock().lock(); } - startOperation(); try { doRemoveSegment(segment); _trackedSegments.remove(segment); } finally { - finishOperation(); if (_enableSnapshot) { _snapshotLock.readLock().unlock(); } + finishOperation(); } } @@ -530,12 +522,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps if (_partialUpsertHandler == null) { return record; } - if (_stopped) { + if (!startOperation()) { _logger.debug("Skip updating record because metadata manager is already stopped"); return record; } - - startOperation(); try { return doUpdateRecord(record, recordInfo); } finally { @@ -566,22 +556,20 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps if (!_enableSnapshot) { return; } - if (_stopped) { - _logger.info("Skip taking snapshot because metadata manager is already stopped"); - return; - } if (!_gotFirstConsumingSegment) { _logger.info("Skip taking snapshot before getting the first consuming segment"); return; } - + if (!startOperation()) { + _logger.info("Skip taking snapshot because metadata manager is already stopped"); + return; + } _snapshotLock.writeLock().lock(); - startOperation(); try { doTakeSnapshot(); } finally { - finishOperation(); _snapshotLock.writeLock().unlock(); + finishOperation(); } } @@ -597,8 +585,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps if (segment instanceof ImmutableSegmentImpl) { ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(); numImmutableSegments++; - numPrimaryKeysInSnapshot += - ((ImmutableSegmentImpl) segment).getValidDocIds().getMutableRoaringBitmap().getCardinality(); + numPrimaryKeysInSnapshot += segment.getValidDocIds().getMutableRoaringBitmap().getCardinality(); } } @@ -667,29 +654,15 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId); } - protected void startOperation() { - _numPendingOperations.getAndIncrement(); - } - - protected void finishOperation() { - if (_numPendingOperations.decrementAndGet() == 0) { - synchronized (_numPendingOperations) { - _numPendingOperations.notifyAll(); - } - } - } - @Override public void removeExpiredPrimaryKeys() { if (_metadataTTL <= 0) { return; } - if (_stopped) { + if (!startOperation()) { _logger.info("Skip removing expired primary keys because metadata manager is already stopped"); return; } - - startOperation(); try { doRemoveExpiredPrimaryKeys(); } finally { @@ -702,29 +675,50 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps */ protected abstract void doRemoveExpiredPrimaryKeys(); + protected synchronized boolean startOperation() { + if (_stopped || _numPendingOperations == 0) { + return false; + } + _numPendingOperations++; + return true; + } + + protected synchronized void finishOperation() { + _numPendingOperations--; + if (_numPendingOperations == 0) { + notifyAll(); + } + } + @Override - public void stop() { + public synchronized void stop() { + if (_stopped) { + _logger.warn("Metadata manager is already stopped"); + return; + } _stopped = true; - int numPendingOperations = _numPendingOperations.decrementAndGet(); + _numPendingOperations--; _logger.info("Stopped the metadata manager with {} pending operations, current primary key count: {}", - numPendingOperations, getNumPrimaryKeys()); + _numPendingOperations, getNumPrimaryKeys()); } @Override - public void close() + public synchronized void close() throws IOException { Preconditions.checkState(_stopped, "Must stop the metadata manager before closing it"); + if (_closed) { + _logger.warn("Metadata manager is already closed"); + return; + } + _closed = true; _logger.info("Closing the metadata manager"); - synchronized (_numPendingOperations) { - int numPendingOperations; - while ((numPendingOperations = _numPendingOperations.get()) != 0) { - _logger.info("Waiting for {} pending operations to finish", numPendingOperations); - try { - _numPendingOperations.wait(); - } catch (InterruptedException e) { - throw new RuntimeException( - String.format("Interrupted while waiting for %d pending operations to finish", numPendingOperations), e); - } + while (_numPendingOperations != 0) { + _logger.info("Waiting for {} pending operations to finish", _numPendingOperations); + try { + wait(); + } catch (InterruptedException e) { + throw new RuntimeException( + String.format("Interrupted while waiting for %d pending operations to finish", _numPendingOperations), e); } } doClose(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index 363ed18e13..3a1904300b 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -27,6 +27,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metrics.ServerMetrics; @@ -49,6 +52,7 @@ import org.apache.pinot.spi.data.readers.PrimaryKey; import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -59,10 +63,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertSame; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; public class ConcurrentMapPartitionUpsertMetadataManagerTest { @@ -83,6 +84,55 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { FileUtils.forceDelete(INDEX_DIR); } + @Test + public void testStartFinishOperation() { + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), + Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, INDEX_DIR, + mock(ServerMetrics.class)); + + // Start 2 operations + assertTrue(upsertMetadataManager.startOperation()); + assertTrue(upsertMetadataManager.startOperation()); + + // Stop and close the metadata manager + AtomicBoolean stopped = new AtomicBoolean(); + AtomicBoolean closed = new AtomicBoolean(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(() -> { + upsertMetadataManager.stop(); + stopped.set(true); + try { + upsertMetadataManager.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + closed.set(true); + }); + executor.shutdown(); + + // Wait for metadata manager to be stopped + TestUtils.waitForCondition(aVoid -> stopped.get(), 10_000L, "Failed to stop the metadata manager"); + + // Metadata manager should block on close because there are 2 pending operations + assertFalse(closed.get()); + + // Starting new operation should fail because the metadata manager is already stopped + assertFalse(upsertMetadataManager.startOperation()); + + // Finish one operation + upsertMetadataManager.finishOperation(); + + // Metadata manager should still block on close because there is still 1 pending operation + assertFalse(closed.get()); + + // Finish the other operation + upsertMetadataManager.finishOperation(); + + // Metadata manager should be closed now + TestUtils.waitForCondition(aVoid -> closed.get(), 10_000L, "Failed to close the metadata manager"); + } + @Test public void testAddReplaceRemoveSegment() throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org