This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6bf696ec25 Make upsert metadata manager pluggable (#9186) 6bf696ec25 is described below commit 6bf696ec25f2571c252c07099b81f9876f839b91 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Aug 12 02:26:46 2022 -0700 Make upsert metadata manager pluggable (#9186) --- .../manager/realtime/RealtimeTableDataManager.java | 26 +- ...adataAndDictionaryAggregationPlanMakerTest.java | 4 +- .../indexsegment/mutable/MutableSegmentImpl.java | 2 +- .../upsert/BaseTableUpsertMetadataManager.java | 79 ++++ ...ncurrentMapPartitionUpsertMetadataManager.java} | 124 ++---- ...> ConcurrentMapTableUpsertMetadataManager.java} | 36 +- .../upsert/PartitionUpsertMetadataManager.java | 430 +-------------------- .../local/{utils => upsert}/RecordInfo.java | 5 +- .../local/upsert/TableUpsertMetadataManager.java | 38 +- .../upsert/TableUpsertMetadataManagerFactory.java | 65 ++++ .../pinot/segment/local/upsert/UpsertUtils.java | 76 ++++ .../dedup/PartitionDedupMetadataManagerTest.java | 2 +- .../MutableSegmentImplUpsertComparisonColTest.java | 8 +- .../mutable/MutableSegmentImplUpsertTest.java | 7 +- ...rentMapPartitionUpsertMetadataManagerTest.java} | 16 +- .../pinot/spi/config/table/UpsertConfig.java | 24 ++ 16 files changed, 338 insertions(+), 604 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index f51da5195b..af7f366ebd 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -57,9 +57,9 @@ import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory; -import org.apache.pinot.segment.local.upsert.PartialUpsertHandler; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory; import org.apache.pinot.segment.local.utils.SchemaUtils; import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils; import org.apache.pinot.segment.spi.ImmutableSegment; @@ -188,26 +188,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { _tableUpsertMetadataManager); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); - - List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); - Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns), - "Primary key columns must be configured for upsert"); - - String comparisonColumn = upsertConfig.getComparisonColumn(); - if (comparisonColumn == null) { - comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName(); - } - - PartialUpsertHandler partialUpsertHandler = null; - if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { - assert upsertConfig.getPartialUpsertStrategies() != null; - partialUpsertHandler = new PartialUpsertHandler(schema, upsertConfig.getPartialUpsertStrategies(), - upsertConfig.getDefaultPartialUpsertStrategy(), comparisonColumn); - } - - _tableUpsertMetadataManager = - new TableUpsertMetadataManager(_tableNameWithType, primaryKeyColumns, comparisonColumn, - upsertConfig.getHashFunction(), partialUpsertHandler, _serverMetrics); + _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics); } } @@ -264,7 +245,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } public boolean isPartialUpsertEnabled() { - return _tableUpsertMetadataManager != null && _tableUpsertMetadataManager.isPartialUpsertEnabled(); + return _tableUpsertMetadataManager != null + && _tableUpsertMetadataManager.getUpsertMode() == UpsertConfig.Mode.PARTIAL; } /* diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java index 4bfb67dfbe..e49b90e657 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java @@ -38,7 +38,7 @@ import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; -import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; @@ -125,7 +125,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest { ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class); _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap); ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert( - new PartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"), + new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"), "daysSinceEpoch", HashFunction.NONE, null, serverMetrics), new ThreadSafeMutableRoaringBitmap()); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index 775fe06f5c..d6de8d0168 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -62,11 +62,11 @@ import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider; import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.RecordInfo; import org.apache.pinot.segment.local.utils.FixedIntArrayOffHeapIdMap; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.segment.local.utils.IdMap; import org.apache.pinot.segment.local.utils.IngestionUtils; -import org.apache.pinot.segment.local.utils.RecordInfo; import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.MutableSegment; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java new file mode 100644 index 0000000000..95666d3ea2 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.commons.collections.CollectionUtils; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.Schema; + + +@ThreadSafe +public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetadataManager { + protected String _tableNameWithType; + protected List<String> _primaryKeyColumns; + protected String _comparisonColumn; + protected HashFunction _hashFunction; + protected PartialUpsertHandler _partialUpsertHandler; + protected ServerMetrics _serverMetrics; + + @Override + public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, + ServerMetrics serverMetrics) { + _tableNameWithType = tableConfig.getTableName(); + + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + Preconditions.checkArgument(upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE, + "Upsert must be enabled for table: %s", _tableNameWithType); + + _primaryKeyColumns = schema.getPrimaryKeyColumns(); + Preconditions.checkArgument(!CollectionUtils.isEmpty(_primaryKeyColumns), + "Primary key columns must be configured for upsert enabled table: %s", _tableNameWithType); + + _comparisonColumn = upsertConfig.getComparisonColumn(); + if (_comparisonColumn == null) { + _comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName(); + } + + _hashFunction = upsertConfig.getHashFunction(); + + if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { + Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); + Preconditions.checkArgument(partialUpsertStrategies != null, + "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); + _partialUpsertHandler = + new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), + _comparisonColumn); + } + + _serverMetrics = serverMetrics; + } + + @Override + public UpsertConfig.Mode getUpsertMode() { + return _partialUpsertHandler == null ? UpsertConfig.Mode.FULL : UpsertConfig.Mode.PARTIAL; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java similarity index 84% copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index d1042fe5c1..e6890c8a6c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -38,7 +38,6 @@ import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.utils.HashUtils; -import org.apache.pinot.segment.local.utils.RecordInfo; import org.apache.pinot.segment.local.utils.SegmentLocks; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; @@ -47,7 +46,6 @@ import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.PrimaryKey; -import org.apache.pinot.spi.utils.ByteArray; import org.roaringbitmap.PeekableIntIterator; import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.slf4j.Logger; @@ -55,32 +53,11 @@ import org.slf4j.LoggerFactory; /** - * Manages the upsert metadata per partition. - * <p>For multiple records with the same comparison value (default to timestamp), the manager will preserve the latest - * record based on the sequence number of the segment. If 2 records with the same comparison value are in the same - * segment, the one with larger doc id will be preserved. Note that for tables with sorted column, the records will be - * re-ordered when committing the segment, and we will use the re-ordered doc ids instead of the ingestion doc ids to - * decide the record to preserve. - * - * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the - * operation is done: - * <ul> - * <li> - * When updating a new record, it first removes the doc id from the current location, then update the new location. - * </li> - * <li> - * When adding a new segment, it removes the doc ids from the current locations before the segment being added to - * the RealtimeTableDataManager. - * </li> - * <li> - * When replacing an existing segment, after the record location being replaced with the new segment, the following - * updates applied to the new segment's valid doc ids won't be reflected to the replaced segment's valid doc ids. - * </li> - * </ul> + * Implementation of {@link PartitionUpsertMetadataManager} that is backed by a {@link ConcurrentHashMap}. */ @SuppressWarnings({"rawtypes", "unchecked"}) @ThreadSafe -public class PartitionUpsertMetadataManager { +public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUpsertMetadataManager { private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1); private final String _tableNameWithType; @@ -105,9 +82,9 @@ public class PartitionUpsertMetadataManager { private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE; private int _numOutOfOrderEvents = 0; - public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, List<String> primaryKeyColumns, - String comparisonColumn, HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, - ServerMetrics serverMetrics) { + public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, + List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction, + @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) { _tableNameWithType = tableNameWithType; _partitionId = partitionId; _primaryKeyColumns = primaryKeyColumns; @@ -118,16 +95,12 @@ public class PartitionUpsertMetadataManager { _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); } - /** - * Returns the primary key columns. - */ + @Override public List<String> getPrimaryKeyColumns() { return _primaryKeyColumns; } - /** - * Initializes the upsert metadata for the given immutable segment. - */ + @Override public void addSegment(ImmutableSegment segment) { addSegment(segment, null, null); } @@ -154,7 +127,7 @@ public class PartitionUpsertMetadataManager { validDocIds = new ThreadSafeMutableRoaringBitmap(); } if (recordInfoIterator == null) { - recordInfoIterator = getRecordInfoIterator(segment); + recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn); } addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, null, null); } finally { @@ -169,42 +142,6 @@ public class PartitionUpsertMetadataManager { _logger.info("Finished adding segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); } - private Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment) { - int numTotalDocs = segment.getSegmentMetadata().getTotalDocs(); - return new Iterator<RecordInfo>() { - private int _docId = 0; - - @Override - public boolean hasNext() { - return _docId < numTotalDocs; - } - - @Override - public RecordInfo next() { - PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]); - getPrimaryKey(segment, _docId, primaryKey); - - Object comparisonValue = segment.getValue(_docId, _comparisonColumn); - if (comparisonValue instanceof byte[]) { - comparisonValue = new ByteArray((byte[]) comparisonValue); - } - return new RecordInfo(primaryKey, _docId++, (Comparable) comparisonValue); - } - }; - } - - private void getPrimaryKey(IndexSegment segment, int docId, PrimaryKey buffer) { - Object[] values = buffer.getValues(); - int numPrimaryKeyColumns = values.length; - for (int i = 0; i < numPrimaryKeyColumns; i++) { - Object value = segment.getValue(docId, _primaryKeyColumns.get(i)); - if (value instanceof byte[]) { - value = new ByteArray((byte[]) value); - } - values[i] = value; - } - } - private void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) { @@ -293,9 +230,7 @@ public class PartitionUpsertMetadataManager { } } - /** - * Updates the upsert metadata for a new consumed record in the given consuming segment. - */ + @Override public void addRecord(MutableSegment segment, RecordInfo recordInfo) { ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), @@ -330,9 +265,7 @@ public class PartitionUpsertMetadataManager { _primaryKeyToRecordLocationMap.size()); } - /** - * Replaces the upsert metadata for the old segment with the new immutable segment. - */ + @Override public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) { replaceSegment(segment, null, null, oldSegment); } @@ -363,7 +296,7 @@ public class PartitionUpsertMetadataManager { validDocIds = new ThreadSafeMutableRoaringBitmap(); } if (recordInfoIterator == null) { - recordInfoIterator = getRecordInfoIterator(segment); + recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn); } addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment, validDocIdsForOldSegment); @@ -402,9 +335,7 @@ public class PartitionUpsertMetadataManager { _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); } - /** - * Removes the upsert metadata for the given segment. - */ + @Override public void removeSegment(IndexSegment segment) { String segmentName = segment.getSegmentName(); _logger.info("Removing {} segment: {}, current primary key count: {}", @@ -446,7 +377,7 @@ public class PartitionUpsertMetadataManager { PeekableIntIterator iterator = validDocIds.getIntIterator(); while (iterator.hasNext()) { int docId = iterator.next(); - getPrimaryKey(segment, docId, primaryKey); + UpsertUtils.getPrimaryKey(segment, _primaryKeyColumns, docId, primaryKey); _primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey, _hashFunction), (pk, recordLocation) -> { if (recordLocation.getSegment() == segment) { @@ -457,9 +388,7 @@ public class PartitionUpsertMetadataManager { } } - /** - * Returns the merged record when partial-upsert is enabled. - */ + @Override public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) { // Directly return the record when partial-upsert is not enabled if (_partialUpsertHandler == null) { @@ -498,4 +427,29 @@ public class PartitionUpsertMetadataManager { return record; } } + + @VisibleForTesting + static class RecordLocation { + private final IndexSegment _segment; + private final int _docId; + private final Comparable _comparisonValue; + + public RecordLocation(IndexSegment indexSegment, int docId, Comparable comparisonValue) { + _segment = indexSegment; + _docId = docId; + _comparisonValue = comparisonValue; + } + + public IndexSegment getSegment() { + return _segment; + } + + public int getDocId() { + return _docId; + } + + public Comparable getComparisonValue() { + return _comparisonValue; + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java similarity index 51% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java index 35dff04a75..67474e145d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java @@ -18,33 +18,23 @@ */ package org.apache.pinot.segment.local.upsert; -import org.apache.pinot.segment.spi.IndexSegment; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; /** - * Indicate a record's location on the local host. + * Implementation of {@link TableUpsertMetadataManager} that is backed by a {@link ConcurrentHashMap}. */ -public class RecordLocation { - private final IndexSegment _segment; - private final int _docId; - /** value used to denote the order */ - private final Comparable _comparisonValue; +@ThreadSafe +public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMetadataManager { + private final Map<Integer, ConcurrentMapPartitionUpsertMetadataManager> _partitionMetadataManagerMap = + new ConcurrentHashMap<>(); - public RecordLocation(IndexSegment indexSegment, int docId, Comparable comparisonValue) { - _segment = indexSegment; - _docId = docId; - _comparisonValue = comparisonValue; - } - - public IndexSegment getSegment() { - return _segment; - } - - public int getDocId() { - return _docId; - } - - public Comparable getComparisonValue() { - return _comparisonValue; + @Override + public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) { + return _partitionMetadataManagerMap.computeIfAbsent(partitionId, + k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns, + _comparisonColumn, _hashFunction, _partialUpsertHandler, _serverMetrics)); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java index d1042fe5c1..2c5f68df45 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java @@ -18,40 +18,12 @@ */ package org.apache.pinot.segment.local.upsert; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import java.util.Iterator; import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.pinot.common.metrics.ServerGauge; -import org.apache.pinot.common.metrics.ServerMeter; -import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment; -import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; -import org.apache.pinot.segment.local.utils.HashUtils; -import org.apache.pinot.segment.local.utils.RecordInfo; -import org.apache.pinot.segment.local.utils.SegmentLocks; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.MutableSegment; -import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; -import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.data.readers.PrimaryKey; -import org.apache.pinot.spi.utils.ByteArray; -import org.roaringbitmap.PeekableIntIterator; -import org.roaringbitmap.buffer.MutableRoaringBitmap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** @@ -78,424 +50,36 @@ import org.slf4j.LoggerFactory; * </li> * </ul> */ -@SuppressWarnings({"rawtypes", "unchecked"}) @ThreadSafe -public class PartitionUpsertMetadataManager { - private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1); - - private final String _tableNameWithType; - private final int _partitionId; - private final List<String> _primaryKeyColumns; - private final String _comparisonColumn; - private final HashFunction _hashFunction; - private final PartialUpsertHandler _partialUpsertHandler; - private final ServerMetrics _serverMetrics; - private final Logger _logger; - - // TODO(upsert): consider an off-heap KV store to persist this mapping to improve the recovery speed. - @VisibleForTesting - final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); - - @VisibleForTesting - final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet(); - - // Reused for reading previous record during partial upsert - private final GenericRow _reuse = new GenericRow(); - - private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE; - private int _numOutOfOrderEvents = 0; - - public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, List<String> primaryKeyColumns, - String comparisonColumn, HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, - ServerMetrics serverMetrics) { - _tableNameWithType = tableNameWithType; - _partitionId = partitionId; - _primaryKeyColumns = primaryKeyColumns; - _comparisonColumn = comparisonColumn; - _hashFunction = hashFunction; - _partialUpsertHandler = partialUpsertHandler; - _serverMetrics = serverMetrics; - _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); - } +public interface PartitionUpsertMetadataManager { /** * Returns the primary key columns. */ - public List<String> getPrimaryKeyColumns() { - return _primaryKeyColumns; - } + List<String> getPrimaryKeyColumns(); /** * Initializes the upsert metadata for the given immutable segment. */ - public void addSegment(ImmutableSegment segment) { - addSegment(segment, null, null); - } - - @VisibleForTesting - void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, - @Nullable Iterator<RecordInfo> recordInfoIterator) { - String segmentName = segment.getSegmentName(); - _logger.info("Adding segment: {}, current primary key count: {}", segmentName, - _primaryKeyToRecordLocationMap.size()); - - if (segment instanceof EmptyIndexSegment) { - _logger.info("Skip adding empty segment: {}", segmentName); - return; - } - - Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); - segmentLock.lock(); - try { - Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, - "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName, - _tableNameWithType); - if (validDocIds == null) { - validDocIds = new ThreadSafeMutableRoaringBitmap(); - } - if (recordInfoIterator == null) { - recordInfoIterator = getRecordInfoIterator(segment); - } - addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, null, null); - } finally { - segmentLock.unlock(); - } - - // Update metrics - int numPrimaryKeys = _primaryKeyToRecordLocationMap.size(); - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - numPrimaryKeys); - - _logger.info("Finished adding segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); - } - - private Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment) { - int numTotalDocs = segment.getSegmentMetadata().getTotalDocs(); - return new Iterator<RecordInfo>() { - private int _docId = 0; - - @Override - public boolean hasNext() { - return _docId < numTotalDocs; - } - - @Override - public RecordInfo next() { - PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]); - getPrimaryKey(segment, _docId, primaryKey); - - Object comparisonValue = segment.getValue(_docId, _comparisonColumn); - if (comparisonValue instanceof byte[]) { - comparisonValue = new ByteArray((byte[]) comparisonValue); - } - return new RecordInfo(primaryKey, _docId++, (Comparable) comparisonValue); - } - }; - } - - private void getPrimaryKey(IndexSegment segment, int docId, PrimaryKey buffer) { - Object[] values = buffer.getValues(); - int numPrimaryKeyColumns = values.length; - for (int i = 0; i < numPrimaryKeyColumns; i++) { - Object value = segment.getValue(docId, _primaryKeyColumns.get(i)); - if (value instanceof byte[]) { - value = new ByteArray((byte[]) value); - } - values[i] = value; - } - } - - private void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, - Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment, - @Nullable MutableRoaringBitmap validDocIdsForOldSegment) { - String segmentName = segment.getSegmentName(); - segment.enableUpsert(this, validDocIds); - - AtomicInteger numKeysInWrongSegment = new AtomicInteger(); - while (recordInfoIterator.hasNext()) { - RecordInfo recordInfo = recordInfoIterator.next(); - _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), - (primaryKey, currentRecordLocation) -> { - if (currentRecordLocation != null) { - // Existing primary key - IndexSegment currentSegment = currentRecordLocation.getSegment(); - int comparisonResult = - recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue()); - - // The current record is in the same segment - // Update the record location when there is a tie to keep the newer record. Note that the record info - // iterator will return records with incremental doc ids. - if (currentSegment == segment) { - if (comparisonResult >= 0) { - validDocIds.replace(currentRecordLocation.getDocId(), recordInfo.getDocId()); - return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); - } else { - return currentRecordLocation; - } - } - - // The current record is in an old segment being replaced - // This could happen when committing a consuming segment, or reloading a completed segment. In this - // case, we want to update the record location when there is a tie because the record locations should - // point to the new added segment instead of the old segment being replaced. Also, do not update the valid - // doc ids for the old segment because it has not been replaced yet. We pass in an optional valid doc ids - // snapshot for the old segment, which can be updated and used to track the docs not replaced yet. - if (currentSegment == oldSegment) { - if (comparisonResult >= 0) { - validDocIds.add(recordInfo.getDocId()); - if (validDocIdsForOldSegment != null) { - validDocIdsForOldSegment.remove(currentRecordLocation.getDocId()); - } - return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); - } else { - return currentRecordLocation; - } - } - - // This should not happen because the previously replaced segment should have all keys removed. We still - // handle it here, and also track the number of keys not properly replaced previously. - String currentSegmentName = currentSegment.getSegmentName(); - if (currentSegmentName.equals(segmentName)) { - numKeysInWrongSegment.getAndIncrement(); - if (comparisonResult >= 0) { - validDocIds.add(recordInfo.getDocId()); - return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); - } else { - return currentRecordLocation; - } - } - - // The current record is in a different segment - // Update the record location when getting a newer comparison value, or the value is the same as the - // current value, but the segment has a larger sequence number (the segment is newer than the current - // segment). - if (comparisonResult > 0 || (comparisonResult == 0 && LLCSegmentName.isLowLevelConsumerSegmentName( - segmentName) && LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName) - && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber( - currentSegmentName))) { - Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentRecordLocation.getDocId()); - validDocIds.add(recordInfo.getDocId()); - return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); - } else { - return currentRecordLocation; - } - } else { - // New primary key - validDocIds.add(recordInfo.getDocId()); - return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); - } - }); - } - int numKeys = numKeysInWrongSegment.get(); - if (numKeys > 0) { - _logger.warn("Found {} primary keys in the wrong segment when adding segment: {}", numKeys, segmentName); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_IN_WRONG_SEGMENT, numKeys); - } - } + void addSegment(ImmutableSegment segment); /** * Updates the upsert metadata for a new consumed record in the given consuming segment. */ - public void addRecord(MutableSegment segment, RecordInfo recordInfo) { - ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); - _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), - (primaryKey, currentRecordLocation) -> { - if (currentRecordLocation != null) { - // Existing primary key - - // Update the record location when the new comparison value is greater than or equal to the current value. - // Update the record location when there is a tie to keep the newer record. - if (recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue()) >= 0) { - IndexSegment currentSegment = currentRecordLocation.getSegment(); - int currentDocId = currentRecordLocation.getDocId(); - if (segment == currentSegment) { - validDocIds.replace(currentDocId, recordInfo.getDocId()); - } else { - Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentDocId); - validDocIds.add(recordInfo.getDocId()); - } - return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); - } else { - return currentRecordLocation; - } - } else { - // New primary key - validDocIds.add(recordInfo.getDocId()); - return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); - } - }); - - // Update metrics - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - _primaryKeyToRecordLocationMap.size()); - } + void addRecord(MutableSegment segment, RecordInfo recordInfo); /** * Replaces the upsert metadata for the old segment with the new immutable segment. */ - public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) { - replaceSegment(segment, null, null, oldSegment); - } - - @VisibleForTesting - void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, - @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) { - String segmentName = segment.getSegmentName(); - Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()), - "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}", - _tableNameWithType, oldSegment.getSegmentName(), segmentName); - _logger.info("Replacing {} segment: {}, current primary key count: {}", - oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, - _primaryKeyToRecordLocationMap.size()); - - Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); - segmentLock.lock(); - try { - MutableRoaringBitmap validDocIdsForOldSegment = - oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null; - if (segment instanceof EmptyIndexSegment) { - _logger.info("Skip adding empty segment: {}", segmentName); - } else { - Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, - "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName, - _tableNameWithType); - if (validDocIds == null) { - validDocIds = new ThreadSafeMutableRoaringBitmap(); - } - if (recordInfoIterator == null) { - recordInfoIterator = getRecordInfoIterator(segment); - } - addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment, - validDocIdsForOldSegment); - } - - if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) { - int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality(); - if (_partialUpsertHandler != null) { - // For partial-upsert table, because we do not restore the original record location when removing the primary - // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a - // consuming segment is replaced by a committed segment that is consumed from a different server with - // different records (some stream consumer cannot guarantee consuming the messages in the same order). - _logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This " - + "can potentially cause inconsistency between replicas", numKeysNotReplaced, segmentName); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED, - numKeysNotReplaced); - } else { - _logger.info("Found {} primary keys not replaced when replacing segment: {}", numKeysNotReplaced, - segmentName); - } - removeSegment(oldSegment, validDocIdsForOldSegment); - } - } finally { - segmentLock.unlock(); - } - - if (!(oldSegment instanceof EmptyIndexSegment)) { - _replacedSegments.add(oldSegment); - } - - // Update metrics - int numPrimaryKeys = _primaryKeyToRecordLocationMap.size(); - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - numPrimaryKeys); - - _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); - } + void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment); /** * Removes the upsert metadata for the given segment. */ - public void removeSegment(IndexSegment segment) { - String segmentName = segment.getSegmentName(); - _logger.info("Removing {} segment: {}, current primary key count: {}", - segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, - _primaryKeyToRecordLocationMap.size()); - - if (_replacedSegments.remove(segment)) { - _logger.info("Skip removing replaced segment: {}", segmentName); - return; - } - - Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); - segmentLock.lock(); - try { - MutableRoaringBitmap validDocIds = - segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null; - if (validDocIds == null || validDocIds.isEmpty()) { - _logger.info("Skip removing segment without valid docs: {}", segmentName); - return; - } - - _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName); - removeSegment(segment, validDocIds); - } finally { - segmentLock.unlock(); - } - - // Update metrics - int numPrimaryKeys = _primaryKeyToRecordLocationMap.size(); - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - numPrimaryKeys); - - _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); - } - - private void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) { - assert !validDocIds.isEmpty(); - PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]); - PeekableIntIterator iterator = validDocIds.getIntIterator(); - while (iterator.hasNext()) { - int docId = iterator.next(); - getPrimaryKey(segment, docId, primaryKey); - _primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey, _hashFunction), - (pk, recordLocation) -> { - if (recordLocation.getSegment() == segment) { - return null; - } - return recordLocation; - }); - } - } + void removeSegment(IndexSegment segment); /** * Returns the merged record when partial-upsert is enabled. */ - public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) { - // Directly return the record when partial-upsert is not enabled - if (_partialUpsertHandler == null) { - return record; - } - - AtomicReference<GenericRow> previousRecordReference = new AtomicReference<>(); - RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent( - HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> { - if (recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) >= 0) { - _reuse.clear(); - previousRecordReference.set(recordLocation.getSegment().getRecord(recordLocation.getDocId(), _reuse)); - } - return recordLocation; - }); - if (currentRecordLocation != null) { - // Existing primary key - GenericRow previousRecord = previousRecordReference.get(); - if (previousRecord != null) { - return _partialUpsertHandler.merge(previousRecord, record); - } else { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L); - _numOutOfOrderEvents++; - long currentTimeNs = System.nanoTime(); - if (currentTimeNs - _lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) { - _logger.warn("Skipped {} out-of-order events for partial-upsert table (the last event has current comparison " - + "value: {}, record comparison value: {})", _numOutOfOrderEvents, - currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue()); - _lastOutOfOrderEventReportTimeNs = currentTimeNs; - _numOutOfOrderEvents = 0; - } - return record; - } - } else { - // New primary key - return record; - } - } + GenericRow updateRecord(GenericRow record, RecordInfo recordInfo); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/RecordInfo.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java similarity index 92% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/RecordInfo.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java index 5e235c86e0..f4f139f6c3 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/RecordInfo.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.utils; +package org.apache.pinot.segment.local.upsert; import org.apache.pinot.spi.data.readers.PrimaryKey; -public final class RecordInfo { +@SuppressWarnings("rawtypes") +public class RecordInfo { private final PrimaryKey _primaryKey; private final int _docId; private final Comparable _comparisonValue; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java index 108438e95e..ffafb999ee 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java @@ -18,45 +18,23 @@ */ package org.apache.pinot.segment.local.upsert; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.Schema; /** * The manager of the upsert metadata of a table. */ @ThreadSafe -public class TableUpsertMetadataManager { - private final Map<Integer, PartitionUpsertMetadataManager> _partitionMetadataManagerMap = new ConcurrentHashMap<>(); - private final String _tableNameWithType; - private final List<String> _primaryKeyColumns; - private final String _comparisonColumn; - private final HashFunction _hashFunction; - private final PartialUpsertHandler _partialUpsertHandler; - private final ServerMetrics _serverMetrics; +public interface TableUpsertMetadataManager { - public TableUpsertMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, String comparisonColumn, - HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) { - _tableNameWithType = tableNameWithType; - _primaryKeyColumns = primaryKeyColumns; - _comparisonColumn = comparisonColumn; - _hashFunction = hashFunction; - _partialUpsertHandler = partialUpsertHandler; - _serverMetrics = serverMetrics; - } + void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, ServerMetrics serverMetrics); - public PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) { - return _partitionMetadataManagerMap.computeIfAbsent(partitionId, - k -> new PartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns, _comparisonColumn, - _hashFunction, _partialUpsertHandler, _serverMetrics)); - } + ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId); - public boolean isPartialUpsertEnabled() { - return _partialUpsertHandler != null; - } + UpsertConfig.Mode getUpsertMode(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java new file mode 100644 index 0000000000..989f5a1e2c --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TableUpsertMetadataManagerFactory { + private TableUpsertMetadataManagerFactory() { + } + + private static final Logger LOGGER = LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class); + + public static TableUpsertMetadataManager create(TableConfig tableConfig, Schema schema, + TableDataManager tableDataManager, ServerMetrics serverMetrics) { + String tableNameWithType = tableConfig.getTableName(); + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + Preconditions.checkArgument(upsertConfig != null, "Must provide upsert config for table: %s", tableNameWithType); + + TableUpsertMetadataManager metadataManager; + String metadataManagerClass = upsertConfig.getMetadataManagerClass(); + if (StringUtils.isNotEmpty(metadataManagerClass)) { + LOGGER.info("Creating TableUpsertMetadataManager with class: {} for table: {}", metadataManagerClass, + tableNameWithType); + try { + metadataManager = + (TableUpsertMetadataManager) Class.forName(metadataManagerClass).getConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException( + String.format("Caught exception constructing TableUpsertMetadataManager with class: %s for table: %s", + metadataManagerClass, tableNameWithType)); + } + } else { + LOGGER.info("Creating ConcurrentMapTableUpsertMetadataManager for table: {}", tableNameWithType); + metadataManager = new ConcurrentMapTableUpsertMetadataManager(); + } + + metadataManager.init(tableConfig, schema, tableDataManager, serverMetrics); + return metadataManager; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java new file mode 100644 index 0000000000..a7ea1b92f7 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import java.util.Iterator; +import java.util.List; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.apache.pinot.spi.utils.ByteArray; + + +@SuppressWarnings("rawtypes") +public class UpsertUtils { + private UpsertUtils() { + } + + /** + * Returns an iterator of {@link RecordInfo} from the segment. + */ + public static Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment, List<String> primaryKeyColumns, + String comparisonColumn) { + int numTotalDocs = segment.getSegmentMetadata().getTotalDocs(); + return new Iterator<RecordInfo>() { + private int _docId = 0; + + @Override + public boolean hasNext() { + return _docId < numTotalDocs; + } + + @Override + public RecordInfo next() { + PrimaryKey primaryKey = new PrimaryKey(new Object[primaryKeyColumns.size()]); + getPrimaryKey(segment, primaryKeyColumns, _docId, primaryKey); + + Object comparisonValue = segment.getValue(_docId, comparisonColumn); + if (comparisonValue instanceof byte[]) { + comparisonValue = new ByteArray((byte[]) comparisonValue); + } + return new RecordInfo(primaryKey, _docId++, (Comparable) comparisonValue); + } + }; + } + + /** + * Reads a primary key from the segment. + */ + public static void getPrimaryKey(IndexSegment segment, List<String> primaryKeyColumns, int docId, PrimaryKey buffer) { + Object[] values = buffer.getValues(); + int numPrimaryKeyColumns = values.length; + for (int i = 0; i < numPrimaryKeyColumns; i++) { + Object value = segment.getValue(docId, primaryKeyColumns.get(i)); + if (value instanceof byte[]) { + value = new ByteArray((byte[]) value); + } + values[i] = value; + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java index 39f7d6ae98..ca305278ca 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java @@ -25,8 +25,8 @@ import java.util.Map; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; +import org.apache.pinot.segment.local.upsert.RecordInfo; import org.apache.pinot.segment.local.utils.HashUtils; -import org.apache.pinot.segment.local.utils.RecordInfo; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.data.readers.PrimaryKey; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java index 08cbf3e91c..a8d8e956c4 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java @@ -22,10 +22,10 @@ import java.io.File; import java.net.URL; import java.util.Collections; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; -import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager; -import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -66,8 +66,8 @@ public class MutableSegmentImplUpsertComparisonColTest { _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); _partitionUpsertMetadataManager = - new TableUpsertMetadataManager("testTable_REALTIME", _schema.getPrimaryKeyColumns(), "offset", - HashFunction.NONE, null, mock(ServerMetrics.class)).getOrCreatePartitionManager(0); + TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, mock(TableDataManager.class), + mock(ServerMetrics.class)).getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), false, true, offsetUpsertConfig, "secondsSinceEpoch", diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java index 015ba5705a..a2439d9f8f 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java @@ -22,9 +22,10 @@ import java.io.File; import java.net.URL; import java.util.Collections; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; -import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -64,8 +65,8 @@ public class MutableSegmentImplUpsertTest { _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); _partitionUpsertMetadataManager = - new TableUpsertMetadataManager("testTable_REALTIME", _schema.getPrimaryKeyColumns(), "secondsSinceEpoch", - hashFunction, null, mock(ServerMetrics.class)).getOrCreatePartitionManager(0); + TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, mock(TableDataManager.class), + mock(ServerMetrics.class)).getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), false, true, upsertConfigWithHash, "secondsSinceEpoch", diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java similarity index 96% rename from pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java rename to pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index aa1392d6df..5a6e45573c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -26,8 +26,8 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; +import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation; import org.apache.pinot.segment.local.utils.HashUtils; -import org.apache.pinot.segment.local.utils.RecordInfo; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.MutableSegment; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; @@ -49,7 +49,7 @@ import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; -public class PartitionUpsertMetadataManagerTest { +public class ConcurrentMapPartitionUpsertMetadataManagerTest { private static final String RAW_TABLE_NAME = "testTable"; private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); @@ -61,9 +61,9 @@ public class PartitionUpsertMetadataManagerTest { } private void verifyAddReplaceRemoveSegment(HashFunction hashFunction) { - PartitionUpsertMetadataManager upsertMetadataManager = - new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), "timeCol", - hashFunction, null, mock(ServerMetrics.class)); + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), + "timeCol", hashFunction, null, mock(ServerMetrics.class)); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment @@ -231,9 +231,9 @@ public class PartitionUpsertMetadataManagerTest { } private void verifyAddRecord(HashFunction hashFunction) { - PartitionUpsertMetadataManager upsertMetadataManager = - new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), "timeCol", - hashFunction, null, mock(ServerMetrics.class)); + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), + "timeCol", hashFunction, null, mock(ServerMetrics.class)); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index f687be7501..162e6b3030 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -54,6 +54,12 @@ public class UpsertConfig extends BaseJsonConfig { @JsonPropertyDescription("Column for upsert comparison, default to time column") private String _comparisonColumn; + @JsonPropertyDescription("Custom class for upsert metadata manager") + private String _metadataManagerClass; + + @JsonPropertyDescription("Custom configs for upsert metadata manager") + private Map<String, String> _metadataManagerConfigs; + @Deprecated public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode, @JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy> partialUpsertStrategies, @@ -105,6 +111,16 @@ public class UpsertConfig extends BaseJsonConfig { return _comparisonColumn; } + @Nullable + public String getMetadataManagerClass() { + return _metadataManagerClass; + } + + @Nullable + public Map<String, String> getMetadataManagerConfigs() { + return _metadataManagerConfigs; + } + public void setHashFunction(HashFunction hashFunction) { _hashFunction = hashFunction; } @@ -136,4 +152,12 @@ public class UpsertConfig extends BaseJsonConfig { public void setComparisonColumn(String comparisonColumn) { _comparisonColumn = comparisonColumn; } + + public void setMetadataManagerClass(String metadataManagerClass) { + _metadataManagerClass = metadataManagerClass; + } + + public void setMetadataManagerConfigs(Map<String, String> metadataManagerConfigs) { + _metadataManagerConfigs = metadataManagerConfigs; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org