This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9c1bb02dec Misc fixes for upsert metadata manager (#12319) 9c1bb02dec is described below commit 9c1bb02decc32f5e685c69667a87e1bf7621fb2e Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Jan 24 15:59:12 2024 -0800 Misc fixes for upsert metadata manager (#12319) --- .../upsert/BasePartitionUpsertMetadataManager.java | 52 +++++++++++----------- ...oncurrentMapPartitionUpsertMetadataManager.java | 43 ++++++++---------- 2 files changed, 45 insertions(+), 50 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index b63f58e013..aca199659d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.upsert; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AtomicDouble; import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -91,7 +92,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // Used to maintain the largestSeenComparisonValue to avoid handling out-of-ttl segments/records. // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to compute expired segments. - protected volatile double _largestSeenComparisonValue; + protected final AtomicDouble _largestSeenComparisonValue; // The following variables are always accessed within synchronized block private boolean _stopped; @@ -116,9 +117,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _serverMetrics = ServerMetrics.get(); _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); if (_metadataTTL > 0) { - _largestSeenComparisonValue = loadWatermark(); + _largestSeenComparisonValue = new AtomicDouble(loadWatermark()); } else { - _largestSeenComparisonValue = Double.MIN_VALUE; + _largestSeenComparisonValue = new AtomicDouble(Double.MIN_VALUE); deleteWatermark(); } } @@ -166,17 +167,17 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps double maxComparisonValue = ((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)) .getMaxValue()).doubleValue(); - _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, maxComparisonValue); + _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue)); } // Skip adding segment that has max comparison value smaller than (largestSeenComparisonValue - TTL) - if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) { + if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) { Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot enabled"); Preconditions.checkState(_comparisonColumns.size() == 1, "Upsert TTL does not work with multiple comparison columns"); Number maxComparisonValue = (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue(); - if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) { + if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL) { _logger.info("Skip adding segment: {} because it's out of TTL", segmentName); MutableRoaringBitmap validDocIdsSnapshot = immutableSegment.loadValidDocIdsFromSnapshot(); if (validDocIdsSnapshot != null) { @@ -245,11 +246,20 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // Update metrics long numPrimaryKeys = getNumPrimaryKeys(); + updatePrimaryKeyGauge(numPrimaryKeys); + _logger.info("Finished adding segment: {} in {}ms, current primary key count: {}", segmentName, + System.currentTimeMillis() - startTimeMs, numPrimaryKeys); + } + + protected abstract long getNumPrimaryKeys(); + + protected void updatePrimaryKeyGauge(long numPrimaryKeys) { _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, numPrimaryKeys); + } - _logger.info("Finished adding segment: {} in {}ms, current primary key count: {}", segmentName, - System.currentTimeMillis() - startTimeMs, numPrimaryKeys); + protected void updatePrimaryKeyGauge() { + updatePrimaryKeyGauge(getNumPrimaryKeys()); } @Override @@ -275,7 +285,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } } - private void doPreloadSegment(ImmutableSegmentImpl segment) { + protected void doPreloadSegment(ImmutableSegmentImpl segment) { String segmentName = segment.getSegmentName(); _logger.info("Preloading segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys()); long startTimeMs = System.currentTimeMillis(); @@ -301,8 +311,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // Update metrics long numPrimaryKeys = getNumPrimaryKeys(); - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - numPrimaryKeys); + updatePrimaryKeyGauge(numPrimaryKeys); _logger.info("Finished preloading segment: {} in {}ms, current primary key count: {}", segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys); } @@ -347,8 +356,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } } - protected abstract long getNumPrimaryKeys(); - protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment); @@ -378,8 +385,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } /** - Returns {@code true} when the record is added to the upsert metadata manager, - {@code false} when the record is out-of-order thus not added. + * Returns {@code true} when the record is added to the upsert metadata manager, {@code false} when the record is + * out-of-order thus not added. */ protected abstract boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo); @@ -433,9 +440,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // Update metrics long numPrimaryKeys = getNumPrimaryKeys(); - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - numPrimaryKeys); - + updatePrimaryKeyGauge(numPrimaryKeys); _logger.info("Finished replacing segment: {} in {}ms, current primary key count: {}", segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys); } @@ -506,10 +511,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps return; } // Skip removing segment that has max comparison value smaller than (largestSeenComparisonValue - TTL) - if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) { + if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) { Number maxComparisonValue = (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue(); - if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) { + if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL) { _logger.info("Skip removing segment: {} because it's out of TTL", segmentName); return; } @@ -556,9 +561,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // Update metrics long numPrimaryKeys = getNumPrimaryKeys(); - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - numPrimaryKeys); - + updatePrimaryKeyGauge(numPrimaryKeys); _logger.info("Finished removing segment: {} in {}ms, current primary key count: {}", segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys); } @@ -793,8 +796,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // We don't remove the segment from the metadata manager when // it's closed. This was done to make table deletion faster. Since we don't remove the segment, we never decrease // the primary key count. So, we set the primary key count to 0 here. - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - 0L); + updatePrimaryKeyGauge(0); _logger.info("Closed the metadata manager"); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 576d679368..887582538c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; @@ -223,22 +222,22 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp @Override public void doRemoveExpiredPrimaryKeys() { - AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger(); AtomicInteger numMetadataTTLKeysRemoved = new AtomicInteger(); + AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger(); + double largestSeenComparisonValue = _largestSeenComparisonValue.get(); double metadataTTLKeysThreshold; if (_metadataTTL > 0) { - metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL; + metadataTTLKeysThreshold = largestSeenComparisonValue - _metadataTTL; } else { metadataTTLKeysThreshold = Double.MIN_VALUE; } - double deletedKeysThreshold; - if (_deletedKeysTTL > 0) { - deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL; + deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL; } else { deletedKeysThreshold = Double.MIN_VALUE; } + _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> { double comparisonValue = ((Number) recordLocation.getComparisonValue()).doubleValue(); if (_metadataTTL > 0 && comparisonValue < metadataTTLKeysThreshold) { @@ -255,29 +254,25 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp } }); if (_metadataTTL > 0) { - persistWatermark(_largestSeenComparisonValue); + persistWatermark(largestSeenComparisonValue); } - int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get(); - if (numDeletedTTLKeys > 0) { - _logger.info("Deleted {} primary keys based on deletedKeysTTL in the table {}", numDeletedTTLKeys, - _tableNameWithType); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED, - numDeletedTTLKeys); - } + // Update metrics + updatePrimaryKeyGauge(); int numMetadataTTLKeys = numMetadataTTLKeysRemoved.get(); if (numMetadataTTLKeys > 0) { - _logger.info("Deleted {} primary keys based on metadataTTL in the table {}", numMetadataTTLKeys, - _tableNameWithType); + _logger.info("Deleted {} primary keys based on metadataTTL", numMetadataTTLKeys); _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.METADATA_TTL_PRIMARY_KEYS_REMOVED, numMetadataTTLKeys); } + int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get(); + if (numDeletedTTLKeys > 0) { + _logger.info("Deleted {} primary keys based on deletedKeysTTL", numDeletedTTLKeys); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED, + numDeletedTTLKeys); + } } - /** - Returns {@code true} when the record is added to the upsert metadata manager, - {@code false} when the record is out-of-order thus not added. - */ @Override protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false); @@ -289,7 +284,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp // When TTL is enabled, update largestSeenComparisonValue when adding new record if (_metadataTTL > 0 || _deletedKeysTTL > 0) { double comparisonValue = ((Number) newComparisonValue).doubleValue(); - _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, comparisonValue); + _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, comparisonValue)); } _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), @@ -310,8 +305,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp } return new RecordLocation(segment, newDocId, newComparisonValue); } else { + // Out-of-order record handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue()); - // this is a out-of-order record then set value to true - this indicates whether out-of-order or not isOutOfOrderRecord.set(true); return currentRecordLocation; } @@ -322,9 +317,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp } }); - // Update metrics - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - _primaryKeyToRecordLocationMap.size()); + updatePrimaryKeyGauge(); return !isOutOfOrderRecord.get(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org