This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 92e26a5847 Introduce UpsertContext to simplify the upsert metadata manager constructor (#12120) 92e26a5847 is described below commit 92e26a584723e02927fb25819c07174f2feae594 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Dec 11 16:39:40 2023 -0800 Introduce UpsertContext to simplify the upsert metadata manager constructor (#12120) --- .../manager/realtime/RealtimeTableDataManager.java | 3 +- ...adataAndDictionaryAggregationPlanMakerTest.java | 18 +- .../upsert/BasePartitionUpsertMetadataManager.java | 31 ++-- .../upsert/BaseTableUpsertMetadataManager.java | 80 ++++----- ...oncurrentMapPartitionUpsertMetadataManager.java | 20 +-- .../ConcurrentMapTableUpsertMetadataManager.java | 4 +- .../local/upsert/TableUpsertMetadataManager.java | 6 +- .../pinot/segment/local/upsert/UpsertContext.java | 197 +++++++++++++++++++++ .../MutableSegmentImplUpsertComparisonColTest.java | 37 ++-- .../mutable/MutableSegmentImplUpsertTest.java | 26 ++- ...rrentMapPartitionUpsertMetadataManagerTest.java | 147 +++++++-------- ...oncurrentMapTableUpsertMetadataManagerTest.java | 27 +-- 12 files changed, 395 insertions(+), 201 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index ffe0d47b46..8bf595d37a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -207,8 +207,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // NOTE: Set _tableUpsertMetadataManager before initializing it because when preloading is enabled, we need to // load segments into it _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig); - _tableUpsertMetadataManager.init(tableConfig, schema, this, _serverMetrics, _helixManager, - _segmentPreloadExecutor); + _tableUpsertMetadataManager.init(tableConfig, schema, this, _helixManager, _segmentPreloadExecutor); } // For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java index a73f9c471f..cf3e33f938 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java @@ -38,12 +38,12 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.UpsertContext; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; -import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.IndexConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -54,7 +54,6 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.TimeGranularitySpec; import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.mockito.Mockito; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeClass; @@ -62,6 +61,7 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -126,13 +126,17 @@ public class MetadataAndDictionaryAggregationPlanMakerTest { @BeforeClass public void loadSegment() throws Exception { + ServerMetrics.register(mock(ServerMetrics.class)); _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap); - ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class); _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap); - ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert( - new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"), - Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, - false, 0, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null); + UpsertContext upsertContext = + new UpsertContext.Builder().setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class)) + .setPrimaryKeyColumns(Collections.singletonList("column6")) + .setComparisonColumns(Collections.singletonList("daysSinceEpoch")).setTableIndexDir(INDEX_DIR).build(); + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, upsertContext); + ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(upsertMetadataManager, + new ThreadSafeMutableRoaringBitmap(), null); } @AfterClass diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 9184da1587..b7f9696b11 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -65,6 +65,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps protected final String _tableNameWithType; protected final int _partitionId; + protected final UpsertContext _context; protected final List<String> _primaryKeyColumns; protected final List<String> _comparisonColumns; protected final String _deleteRecordColumn; @@ -97,25 +98,23 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps private int _numPendingOperations = 1; private boolean _closed; - protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, - List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn, - HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, - double metadataTTL, double deletedKeysTTL, File tableIndexDir, ServerMetrics serverMetrics) { + protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) { _tableNameWithType = tableNameWithType; _partitionId = partitionId; - _primaryKeyColumns = primaryKeyColumns; - _comparisonColumns = comparisonColumns; - _deleteRecordColumn = deleteRecordColumn; - _hashFunction = hashFunction; - _partialUpsertHandler = partialUpsertHandler; - _enableSnapshot = enableSnapshot; - _metadataTTL = metadataTTL; - _deletedKeysTTL = deletedKeysTTL; - _tableIndexDir = tableIndexDir; - _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null; - _serverMetrics = serverMetrics; + _context = context; + _primaryKeyColumns = context.getPrimaryKeyColumns(); + _comparisonColumns = context.getComparisonColumns(); + _deleteRecordColumn = context.getDeleteRecordColumn(); + _hashFunction = context.getHashFunction(); + _partialUpsertHandler = context.getPartialUpsertHandler(); + _enableSnapshot = context.isSnapshotEnabled(); + _snapshotLock = _enableSnapshot ? new ReentrantReadWriteLock() : null; + _metadataTTL = context.getMetadataTTL(); + _deletedKeysTTL = context.getDeletedKeysTTL(); + _tableIndexDir = context.getTableIndexDir(); + _serverMetrics = ServerMetrics.get(); _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); - if (metadataTTL > 0) { + if (_metadataTTL > 0) { _largestSeenComparisonValue = loadWatermark(); } else { _largestSeenComparisonValue = Double.MIN_VALUE; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 6c16b4e7f5..e20b264c70 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -36,7 +36,6 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; @@ -56,76 +55,67 @@ import org.slf4j.LoggerFactory; public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetadataManager { private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableUpsertMetadataManager.class); - protected TableConfig _tableConfig; - protected Schema _schema; - protected TableDataManager _tableDataManager; protected String _tableNameWithType; - protected List<String> _primaryKeyColumns; - protected List<String> _comparisonColumns; - protected String _deleteRecordColumn; - protected HashFunction _hashFunction; - protected PartialUpsertHandler _partialUpsertHandler; - protected boolean _enableSnapshot; - protected double _metadataTTL; - protected double _deletedKeysTTL; - protected File _tableIndexDir; - protected ServerMetrics _serverMetrics; + protected TableDataManager _tableDataManager; protected HelixManager _helixManager; protected ExecutorService _segmentPreloadExecutor; + protected UpsertContext _context; private volatile boolean _isPreloading = false; @Override - public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, - ServerMetrics serverMetrics, HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor) { - _tableConfig = tableConfig; - _schema = schema; - _tableDataManager = tableDataManager; + public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, HelixManager helixManager, + @Nullable ExecutorService segmentPreloadExecutor) { _tableNameWithType = tableConfig.getTableName(); + _tableDataManager = tableDataManager; + _helixManager = helixManager; + _segmentPreloadExecutor = segmentPreloadExecutor; UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); Preconditions.checkArgument(upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE, "Upsert must be enabled for table: %s", _tableNameWithType); - _primaryKeyColumns = schema.getPrimaryKeyColumns(); - Preconditions.checkArgument(!CollectionUtils.isEmpty(_primaryKeyColumns), + List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); + Preconditions.checkArgument(!CollectionUtils.isEmpty(primaryKeyColumns), "Primary key columns must be configured for upsert enabled table: %s", _tableNameWithType); - _comparisonColumns = upsertConfig.getComparisonColumns(); - if (_comparisonColumns == null) { - _comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName()); + List<String> comparisonColumns = upsertConfig.getComparisonColumns(); + if (comparisonColumns == null) { + comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName()); } - _deleteRecordColumn = upsertConfig.getDeleteRecordColumn(); - _hashFunction = upsertConfig.getHashFunction(); - + PartialUpsertHandler partialUpsertHandler = null; if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); Preconditions.checkArgument(partialUpsertStrategies != null, "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); - _partialUpsertHandler = + partialUpsertHandler = new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), - _comparisonColumns); + comparisonColumns); } - _enableSnapshot = upsertConfig.isEnableSnapshot(); - _metadataTTL = upsertConfig.getMetadataTTL(); - _deletedKeysTTL = upsertConfig.getDeletedKeysTTL(); - _tableIndexDir = tableDataManager.getTableDataDir(); - _serverMetrics = serverMetrics; - _helixManager = helixManager; - _segmentPreloadExecutor = segmentPreloadExecutor; - - initCustomVariables(); - + String deleteRecordColumn = upsertConfig.getDeleteRecordColumn(); + HashFunction hashFunction = upsertConfig.getHashFunction(); + boolean enableSnapshot = upsertConfig.isEnableSnapshot(); + boolean enablePreload = upsertConfig.isEnablePreload(); + double metadataTTL = upsertConfig.getMetadataTTL(); + double deletedKeysTTL = upsertConfig.getDeletedKeysTTL(); + File tableIndexDir = tableDataManager.getTableDataDir(); + _context = new UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema) + .setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns) + .setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction) + .setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload) + .setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setTableIndexDir(tableIndexDir).build(); LOGGER.info( "Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {}," + " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {}," + " deleted Keys TTL: {}, table index dir: {}", getClass().getSimpleName(), _tableNameWithType, - _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn, _hashFunction, upsertConfig.getMode(), - _enableSnapshot, upsertConfig.isEnablePreload(), _metadataTTL, _deletedKeysTTL, _tableIndexDir); + primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, upsertConfig.getMode(), enableSnapshot, + enablePreload, metadataTTL, deletedKeysTTL, tableIndexDir); + + initCustomVariables(); - if (_enableSnapshot && segmentPreloadExecutor != null && upsertConfig.isEnablePreload()) { + if (enableSnapshot && enablePreload && segmentPreloadExecutor != null) { // Preloading the segments with snapshots for fast upsert metadata recovery. // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE). @@ -226,7 +216,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad @VisibleForTesting IndexLoadingConfig createIndexLoadingConfig() { return new IndexLoadingConfig(_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(), - _tableConfig, _schema); + _context.getTableConfig(), _context.getSchema()); } @VisibleForTesting @@ -266,7 +256,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad } private File getValidDocIdsSnapshotFile(String segmentName, String segmentTier) { - File indexDir = _tableDataManager.getSegmentDataDir(segmentName, segmentTier, _tableConfig); + File indexDir = _tableDataManager.getSegmentDataDir(segmentName, segmentTier, _context.getTableConfig()); return new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME); } @@ -277,6 +267,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad @Override public UpsertConfig.Mode getUpsertMode() { - return _partialUpsertHandler == null ? UpsertConfig.Mode.FULL : UpsertConfig.Mode.PARTIAL; + return _context.getPartialUpsertHandler() == null ? UpsertConfig.Mode.FULL : UpsertConfig.Mode.PARTIAL; } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index fe36054f18..576d679368 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -19,9 +19,7 @@ package org.apache.pinot.segment.local.upsert; import com.google.common.annotations.VisibleForTesting; -import java.io.File; import java.util.Iterator; -import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,7 +28,6 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMeter; -import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.segment.readers.LazyRow; @@ -38,7 +35,6 @@ import org.apache.pinot.segment.local.utils.HashUtils; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.MutableSegment; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; -import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.PrimaryKey; import org.roaringbitmap.PeekableIntIterator; @@ -58,12 +54,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp @VisibleForTesting final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); - public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, - List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn, - HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, - double metadataTTL, double deletedKeysTTL, File tableIndexDir, ServerMetrics serverMetrics) { - super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, - hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, deletedKeysTTL, tableIndexDir, serverMetrics); + public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) { + super(tableNameWithType, partitionId, context); } @Override @@ -268,15 +260,15 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get(); if (numDeletedTTLKeys > 0) { - _logger.info("Deleted {} primary keys based on deletedKeysTTL in the table {}", - numDeletedTTLKeys, _tableNameWithType); + _logger.info("Deleted {} primary keys based on deletedKeysTTL in the table {}", numDeletedTTLKeys, + _tableNameWithType); _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED, numDeletedTTLKeys); } int numMetadataTTLKeys = numMetadataTTLKeysRemoved.get(); if (numMetadataTTLKeys > 0) { - _logger.info("Deleted {} primary keys based on metadataTTL in the table {}", - numMetadataTTLKeys, _tableNameWithType); + _logger.info("Deleted {} primary keys based on metadataTTL in the table {}", numMetadataTTLKeys, + _tableNameWithType); _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.METADATA_TTL_PRIMARY_KEYS_REMOVED, numMetadataTTLKeys); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java index 2aeee10749..b0593f8d5f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java @@ -35,9 +35,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta @Override public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) { return _partitionMetadataManagerMap.computeIfAbsent(partitionId, - k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns, - _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler, - _enableSnapshot, _metadataTTL, _deletedKeysTTL, _tableIndexDir, _serverMetrics)); + k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _context)); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java index b6ae51b265..2ac107d790 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java @@ -23,7 +23,6 @@ import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.helix.HelixManager; -import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -35,8 +34,9 @@ import org.apache.pinot.spi.data.Schema; */ @ThreadSafe public interface TableUpsertMetadataManager extends Closeable { - void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, ServerMetrics serverMetrics, - HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor); + + void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, HelixManager helixManager, + @Nullable ExecutorService segmentPreloadExecutor); PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java new file mode 100644 index 0000000000..707c22151c --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; + + +public class UpsertContext { + private final TableConfig _tableConfig; + private final Schema _schema; + private final List<String> _primaryKeyColumns; + private final List<String> _comparisonColumns; + private final String _deleteRecordColumn; + private final HashFunction _hashFunction; + private final PartialUpsertHandler _partialUpsertHandler; + private final boolean _enableSnapshot; + private final boolean _enablePreload; + private final double _metadataTTL; + private final double _deletedKeysTTL; + private final File _tableIndexDir; + + private UpsertContext(TableConfig tableConfig, Schema schema, List<String> primaryKeyColumns, + List<String> comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction, + @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, boolean enablePreload, + double metadataTTL, double deletedKeysTTL, File tableIndexDir) { + _tableConfig = tableConfig; + _schema = schema; + _primaryKeyColumns = primaryKeyColumns; + _comparisonColumns = comparisonColumns; + _deleteRecordColumn = deleteRecordColumn; + _hashFunction = hashFunction; + _partialUpsertHandler = partialUpsertHandler; + _enableSnapshot = enableSnapshot; + _enablePreload = enablePreload; + _metadataTTL = metadataTTL; + _deletedKeysTTL = deletedKeysTTL; + _tableIndexDir = tableIndexDir; + } + + public TableConfig getTableConfig() { + return _tableConfig; + } + + public Schema getSchema() { + return _schema; + } + + public List<String> getPrimaryKeyColumns() { + return _primaryKeyColumns; + } + + public List<String> getComparisonColumns() { + return _comparisonColumns; + } + + public String getDeleteRecordColumn() { + return _deleteRecordColumn; + } + + public HashFunction getHashFunction() { + return _hashFunction; + } + + public PartialUpsertHandler getPartialUpsertHandler() { + return _partialUpsertHandler; + } + + public boolean isSnapshotEnabled() { + return _enableSnapshot; + } + + public boolean isPreloadEnabled() { + return _enablePreload; + } + + public double getMetadataTTL() { + return _metadataTTL; + } + + public double getDeletedKeysTTL() { + return _deletedKeysTTL; + } + + public File getTableIndexDir() { + return _tableIndexDir; + } + + public static class Builder { + private TableConfig _tableConfig; + private Schema _schema; + private List<String> _primaryKeyColumns; + private List<String> _comparisonColumns; + private String _deleteRecordColumn; + private HashFunction _hashFunction = HashFunction.NONE; + private PartialUpsertHandler _partialUpsertHandler; + private boolean _enableSnapshot; + private boolean _enablePreload; + private double _metadataTTL; + private double _deletedKeysTTL; + private File _tableIndexDir; + + public Builder setTableConfig(TableConfig tableConfig) { + _tableConfig = tableConfig; + return this; + } + + public Builder setSchema(Schema schema) { + _schema = schema; + return this; + } + + public Builder setPrimaryKeyColumns(List<String> primaryKeyColumns) { + _primaryKeyColumns = primaryKeyColumns; + return this; + } + + public Builder setComparisonColumns(List<String> comparisonColumns) { + _comparisonColumns = comparisonColumns; + return this; + } + + public Builder setDeleteRecordColumn(String deleteRecordColumn) { + _deleteRecordColumn = deleteRecordColumn; + return this; + } + + public Builder setHashFunction(HashFunction hashFunction) { + _hashFunction = hashFunction; + return this; + } + + public Builder setPartialUpsertHandler(PartialUpsertHandler partialUpsertHandler) { + _partialUpsertHandler = partialUpsertHandler; + return this; + } + + public Builder setEnableSnapshot(boolean enableSnapshot) { + _enableSnapshot = enableSnapshot; + return this; + } + + public Builder setEnablePreload(boolean enablePreload) { + _enablePreload = enablePreload; + return this; + } + + public Builder setMetadataTTL(double metadataTTL) { + _metadataTTL = metadataTTL; + return this; + } + + public Builder setDeletedKeysTTL(double deletedKeysTTL) { + _deletedKeysTTL = deletedKeysTTL; + return this; + } + + public Builder setTableIndexDir(File tableIndexDir) { + _tableIndexDir = tableIndexDir; + return this; + } + + public UpsertContext build() { + Preconditions.checkState(_tableConfig != null, "Table config must be set"); + Preconditions.checkState(_schema != null, "Schema must be set"); + Preconditions.checkState(CollectionUtils.isNotEmpty(_primaryKeyColumns), "Primary key columns must be set"); + Preconditions.checkState(CollectionUtils.isNotEmpty(_comparisonColumns), "Comparison columns must be set"); + Preconditions.checkState(_hashFunction != null, "Hash function must be set"); + Preconditions.checkState(_tableIndexDir != null, "Table index directory must be set"); + return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn, + _hashFunction, _partialUpsertHandler, _enableSnapshot, _enablePreload, _metadataTTL, _deletedKeysTTL, + _tableIndexDir); + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java index fe17e40dc2..38bc66ade5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java @@ -40,21 +40,35 @@ import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.utils.BooleanUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.testng.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MutableSegmentImplUpsertComparisonColTest { private static final String SCHEMA_FILE_PATH = "data/test_upsert_comparison_col_schema.json"; private static final String DATA_FILE_PATH = "data/test_upsert_comparison_col_data.json"; - private static CompositeTransformer _recordTransformer; - private static Schema _schema; - private static TableConfig _tableConfig; - private static MutableSegmentImpl _mutableSegmentImpl; - private static PartitionUpsertMetadataManager _partitionUpsertMetadataManager; + private static final String RAW_TABLE_NAME = "testTable"; + private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); + + private TableDataManager _tableDataManager; + private TableConfig _tableConfig; + private Schema _schema; + private CompositeTransformer _recordTransformer; + private MutableSegmentImpl _mutableSegmentImpl; + private PartitionUpsertMetadataManager _partitionUpsertMetadataManager; + + @BeforeClass + public void setUp() { + ServerMetrics.register(mock(ServerMetrics.class)); + _tableDataManager = mock(TableDataManager.class); + when(_tableDataManager.getTableDataDir()).thenReturn(new File(REALTIME_TABLE_NAME)); + } private UpsertConfig createFullUpsertConfig(HashFunction hashFunction) { UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.FULL); @@ -67,20 +81,19 @@ public class MutableSegmentImplUpsertComparisonColTest { throws Exception { URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH); URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH); - _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfig) - .build(); + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfig).build(); + _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig); - tableUpsertMetadataManager.init(_tableConfig, _schema, mock(TableDataManager.class), mock(ServerMetrics.class), - mock(HelixManager.class), mock(ExecutorService.class)); + tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager, mock(HelixManager.class), + mock(ExecutorService.class)); _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), - Collections.emptySet(), false, true, upsertConfig, "secondsSinceEpoch", - _partitionUpsertMetadataManager, null); + Collections.emptySet(), false, true, upsertConfig, "secondsSinceEpoch", _partitionUpsertMetadataManager, + null); GenericRow reuse = new GenericRow(); try (RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile, _schema.getColumnNames(), null)) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java index 4f243c6000..702bd47f80 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java @@ -41,22 +41,36 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.testng.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MutableSegmentImplUpsertTest { private static final String SCHEMA_FILE_PATH = "data/test_upsert_schema.json"; private static final String DATA_FILE_PATH = "data/test_upsert_data.json"; - private CompositeTransformer _recordTransformer; - private Schema _schema; + private static final String RAW_TABLE_NAME = "testTable"; + private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); + + private TableDataManager _tableDataManager; private TableConfig _tableConfig; + private Schema _schema; + private CompositeTransformer _recordTransformer; private MutableSegmentImpl _mutableSegmentImpl; private PartitionUpsertMetadataManager _partitionUpsertMetadataManager; + @BeforeClass + public void setUp() { + ServerMetrics.register(mock(ServerMetrics.class)); + _tableDataManager = mock(TableDataManager.class); + when(_tableDataManager.getTableDataDir()).thenReturn(new File(REALTIME_TABLE_NAME)); + } + private UpsertConfig createPartialUpsertConfig(HashFunction hashFunction) { UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.PARTIAL); upsertConfigWithHash.setPartialUpsertStrategies(new HashMap<>()); @@ -76,15 +90,15 @@ public class MutableSegmentImplUpsertTest { throws Exception { URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH); URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH); - _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash) + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfigWithHash) .setNullHandlingEnabled(true).build(); + _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig); - tableUpsertMetadataManager.init(_tableConfig, _schema, mock(TableDataManager.class), mock(ServerMetrics.class), - mock(HelixManager.class), mock(ExecutorService.class)); + tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager, mock(HelixManager.class), + mock(ExecutorService.class)); _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index 35e621245c..17a1e048dc 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -48,7 +48,9 @@ import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.PrimaryKey; import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.spi.utils.BytesUtils; @@ -58,6 +60,7 @@ import org.mockito.MockedConstruction; import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.any; @@ -72,13 +75,25 @@ import static org.testng.Assert.*; public class ConcurrentMapPartitionUpsertMetadataManagerTest { private static final String RAW_TABLE_NAME = "testTable"; private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); + private static final List<String> PRIMARY_KEY_COLUMNS = Collections.singletonList("pk"); + private static final List<String> COMPARISON_COLUMNS = Collections.singletonList("timeCol"); + private static final String DELETE_RECORD_COLUMN = "deleteCol"; private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ConcurrentMapPartitionUpsertMetadataManagerTest"); + private UpsertContext.Builder _contextBuilder; + @BeforeClass public void setUp() throws IOException { FileUtils.forceMkdir(INDEX_DIR); + ServerMetrics.register(mock(ServerMetrics.class)); + } + + @BeforeMethod + public void setUpContextBuilder() { + _contextBuilder = new UpsertContext.Builder().setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class)) + .setPrimaryKeyColumns(PRIMARY_KEY_COLUMNS).setComparisonColumns(COMPARISON_COLUMNS).setTableIndexDir(INDEX_DIR); } @AfterClass @@ -90,9 +105,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { @Test public void testStartFinishOperation() { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, 0, INDEX_DIR, - mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); // Start 2 operations assertTrue(upsertMetadataManager.startOperation()); @@ -150,6 +163,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { @Test public void testUpsertMetadataCleanupWithTTLConfig() throws IOException { + _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30); verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120)); verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120)); verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120)); @@ -165,6 +179,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { @Test public void testGetQueryableDocIds() { + _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN); + boolean[] deleteFlags1 = new boolean[]{false, false, false, true, true, false}; int[] docIds1 = new int[]{2, 4, 5}; MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap(); @@ -202,11 +218,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot) throws IOException { - String comparisonColumn = "timeCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR, - mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(hashFunction).build()); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments; @@ -354,6 +368,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { @Test public void testAddReplaceRemoveSegmentWithRecordDelete() throws IOException { + _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN); verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.NONE, false); verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MD5, false); verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, false); @@ -364,12 +379,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction hashFunction, boolean enableSnapshot) throws IOException { - String comparisonColumn = "timeCol"; - String deleteRecordColumn = "deleteCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0, - 0, INDEX_DIR, mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(hashFunction).setEnableSnapshot(enableSnapshot).build()); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments; @@ -548,11 +560,11 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { } private List<RecordInfo> getRecordInfoListForTTL(int numRecords, int[] primaryKeys, int[] timestamps, - @Nullable boolean[] deleteRecordFlags) { + @Nullable boolean[] deleteRecordFlags) { List<RecordInfo> recordInfoList = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new Integer(timestamps[i]), - deleteRecordFlags != null && deleteRecordFlags[i])); + deleteRecordFlags != null && deleteRecordFlags[i])); } return recordInfoList; } @@ -667,11 +679,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private void verifyAddRecord(HashFunction hashFunction) throws IOException { - String comparisonColumn = "timeCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR, - mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(hashFunction).build()); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment @@ -760,11 +770,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private void verifyAddOutOfOrderRecord(HashFunction hashFunction) throws IOException { - String comparisonColumn = "timeCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR, - mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(hashFunction).build()); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment @@ -828,11 +836,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { } private void verifyPreloadSegment(HashFunction hashFunction) { - String comparisonColumn = "timeCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR, - mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(hashFunction).build()); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment @@ -876,6 +882,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { @Test public void testAddRecordWithDeleteColumn() throws IOException { + _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN); verifyAddRecordWithDeleteColumn(HashFunction.NONE); verifyAddRecordWithDeleteColumn(HashFunction.MD5); verifyAddRecordWithDeleteColumn(HashFunction.MURMUR3); @@ -883,12 +890,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction) throws IOException { - String comparisonColumn = "timeCol"; - String deleteColumn = "deleteCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, - false, 0, 0, INDEX_DIR, mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(hashFunction).build()); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // queryableDocIds is same as validDocIds in the absence of delete markers @@ -985,21 +989,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { @Test public void testRemoveExpiredDeletedKeys() - throws IOException { + throws IOException { + _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN).setDeletedKeysTTL(20); verifyRemoveExpiredDeletedKeys(HashFunction.NONE); verifyRemoveExpiredDeletedKeys(HashFunction.MD5); verifyRemoveExpiredDeletedKeys(HashFunction.MURMUR3); } private void verifyRemoveExpiredDeletedKeys(HashFunction hashFunction) - throws IOException { - - String comparisonColumn = "timeCol"; - String deleteColumn = "deleteCol"; + throws IOException { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, - false, 0, 20, INDEX_DIR, mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(hashFunction).build()); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment @@ -1010,9 +1011,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap(); ImmutableSegmentImpl segment1 = - mockImmutableSegment(1, validDocIds1, queryableDocIds1, getPrimaryKeyList(numRecords, primaryKeys)); + mockImmutableSegment(1, validDocIds1, queryableDocIds1, getPrimaryKeyList(numRecords, primaryKeys)); upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1, - getRecordInfoListForTTL(numRecords, primaryKeys, timestamps, null).iterator()); + getRecordInfoListForTTL(numRecords, primaryKeys, timestamps, null).iterator()); // Update records from the second segment ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); @@ -1069,16 +1070,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2}); assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{}); + + // Stop the metadata manager + upsertMetadataManager.stop(); + + // Close the metadata manager + upsertMetadataManager.close(); } private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue) throws IOException { - File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME); - ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, 0, tableDir, - mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; @@ -1095,8 +1098,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys); ImmutableSegmentImpl segment1 = - mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"), - earlierComparisonValue, null); + mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, earlierComparisonValue, + null); int[] docIds1 = new int[]{0, 1, 2, 3}; MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap(); @@ -1141,12 +1144,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private void verifyAddOutOfTTLSegment() throws IOException { - File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME); - ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, 0, tableDir, - mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; @@ -1163,8 +1162,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys); ImmutableSegmentImpl segment1 = - mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"), - new Double(80), null); + mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, new Double(80), null); int[] docIds1 = new int[]{0, 1, 2, 3}; MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap(); @@ -1199,8 +1197,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { validDocIdsSnapshot2.add(docIds2); ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); ImmutableSegmentImpl segment2 = - mockImmutableSegmentWithEndTime(1, validDocIds2, null, primaryKeys2, Collections.singletonList("timeCol"), - new Double(80), validDocIdsSnapshot2); + mockImmutableSegmentWithEndTime(1, validDocIds2, null, primaryKeys2, COMPARISON_COLUMNS, new Double(80), + validDocIdsSnapshot2); upsertMetadataManager.addSegment(segment2); // out of ttl segment should not be added to recordLocationMap assertEquals(recordLocationMap.size(), 5); @@ -1214,12 +1212,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private void verifyAddOutOfTTLSegmentWithRecordDelete() throws IOException { - String comparisonColumn = "timeCol"; - String deleteRecordColumn = "deleteCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30, - 0, INDEX_DIR, mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments; @@ -1235,8 +1229,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { int[] docIds1 = new int[]{2, 4, 5}; MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap(); validDocIdsSnapshot1.add(docIds1); - ImmutableSegmentImpl segment1 = mockImmutableSegmentWithEndTime(1, validDocIds1, queryableDocIds1, primaryKeys1, - Collections.singletonList(comparisonColumn), new Double(120), validDocIdsSnapshot1); + ImmutableSegmentImpl segment1 = + mockImmutableSegmentWithEndTime(1, validDocIds1, queryableDocIds1, primaryKeys1, COMPARISON_COLUMNS, + new Double(120), validDocIdsSnapshot1); // get recordInfo from validDocIdSnapshot. // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} @@ -1266,7 +1261,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { validDocIdsSnapshot2.add(docIds2); ImmutableSegmentImpl segment2 = mockImmutableSegmentWithEndTime(2, validDocIds2, queryableDocIds2, getPrimaryKeyList(numRecords, primaryKeys), - Collections.singletonList(comparisonColumn), new Double(40), validDocIdsSnapshot2); + COMPARISON_COLUMNS, new Double(40), validDocIdsSnapshot2); // get recordInfo from validDocIdSnapshot. // segment2 snapshot: 3 -> {3, 40}, 4 -> {4, 40} @@ -1299,12 +1294,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { public void verifyGetQueryableDocIds(boolean isDeleteColumnNull, boolean[] deleteFlags, MutableRoaringBitmap validDocIdsSnapshot, MutableRoaringBitmap queryableDocIds) { - String comparisonColumn = "timeCol"; - String deleteRecordColumn = "deleteCol"; ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30, - 0, INDEX_DIR, mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); try (MockedConstruction<PinotSegmentColumnReader> deleteColReader = mockConstruction(PinotSegmentColumnReader.class, (mockReader, context) -> { @@ -1318,9 +1309,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { ColumnMetadata columnMetadata = mock(ColumnMetadata.class); when(segmentMetadata.getTotalDocs()).thenReturn(deleteFlags.length); when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{ - this.put(comparisonColumn, columnMetadata); + this.put(COMPARISON_COLUMNS.get(0), columnMetadata); }}); - when(columnMetadata.getMaxValue()).thenReturn(null); ImmutableSegmentImpl segment = mockImmutableSegmentWithSegmentMetadata(1, new ThreadSafeMutableRoaringBitmap(), null, null, segmentMetadata, @@ -1331,12 +1321,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private void verifyAddSegmentForTTL(Comparable comparisonValue) throws IOException { - File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME); - ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, 0, tableDir, - mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; @@ -1352,8 +1338,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys); ImmutableSegmentImpl segment1 = - mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"), -1, - null); + mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, -1, null); int[] docIds1 = new int[]{0, 1, 2, 3}; MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap(); @@ -1395,9 +1380,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private void verifyPersistAndLoadWatermark() throws IOException { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, 0, INDEX_DIR, - mock(ServerMetrics.class)); + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); double currentTimeMs = System.currentTimeMillis(); upsertMetadataManager.persistWatermark(currentTimeMs); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java index 526e0920c3..8ceeb4ce0b 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java @@ -59,12 +59,19 @@ import static org.testng.Assert.assertTrue; public class ConcurrentMapTableUpsertMetadataManagerTest { private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "ConcurrentMapTableUpsertMetadataManagerTest"); + private static final String REALTIME_TABLE_NAME = "testTable_REALTIME"; + private static final File TABLE_DATA_DIR = new File(TEMP_DIR, REALTIME_TABLE_NAME); + + private TableDataManager _tableDataManager; private ExecutorService _segmentPreloadExecutor; @BeforeClass public void setUp() throws Exception { FileUtils.deleteQuietly(TEMP_DIR); + ServerMetrics.register(mock(ServerMetrics.class)); + _tableDataManager = mock(TableDataManager.class); + when(_tableDataManager.getTableDataDir()).thenReturn(TABLE_DATA_DIR); _segmentPreloadExecutor = Executors.newFixedThreadPool(1); } @@ -86,16 +93,14 @@ public class ConcurrentMapTableUpsertMetadataManagerTest { // Preloading is skipped as snapshot is not enabled. ConcurrentMapTableUpsertMetadataManager mgr = new ConcurrentMapTableUpsertMetadataManager(); assertFalse(mgr.isPreloading()); - mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class), - _segmentPreloadExecutor); + mgr.init(tableConfig, schema, _tableDataManager, mock(HelixManager.class), _segmentPreloadExecutor); assertFalse(mgr.isPreloading()); // Preloading is skipped as preloading is not turned on. upsertConfig.setEnableSnapshot(true); mgr = new ConcurrentMapTableUpsertMetadataManager(); assertFalse(mgr.isPreloading()); - mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class), - _segmentPreloadExecutor); + mgr.init(tableConfig, schema, _tableDataManager, mock(HelixManager.class), _segmentPreloadExecutor); assertFalse(mgr.isPreloading()); upsertConfig.setEnablePreload(true); @@ -103,8 +108,7 @@ public class ConcurrentMapTableUpsertMetadataManagerTest { assertFalse(mgr.isPreloading()); // The preloading logic will hit on error as the HelixManager mock is not fully setup. But failure of preloading // should not fail the init() method. - mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class), - _segmentPreloadExecutor); + mgr.init(tableConfig, schema, _tableDataManager, mock(HelixManager.class), _segmentPreloadExecutor); assertFalse(mgr.isPreloading()); } @@ -141,14 +145,13 @@ public class ConcurrentMapTableUpsertMetadataManagerTest { }; // Setup mocks for TableConfig and Schema. - String tableNameWithType = "myTable_REALTIME"; TableConfig tableConfig = mock(TableConfig.class); UpsertConfig upsertConfig = new UpsertConfig(); upsertConfig.setComparisonColumn("ts"); upsertConfig.setEnablePreload(true); upsertConfig.setEnableSnapshot(true); when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig); - when(tableConfig.getTableName()).thenReturn(tableNameWithType); + when(tableConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME); Schema schema = mock(Schema.class); when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk")); @@ -169,16 +172,18 @@ public class ConcurrentMapTableUpsertMetadataManagerTest { SegmentZKMetadata realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg01"); realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE); when(propertyStore.get( - eq(ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, "online_seg01")), any(), + eq(ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME, "online_seg01")), any(), anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord()); realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg02"); realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE); when(propertyStore.get( - eq(ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, "online_seg02")), any(), + eq(ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME, "online_seg02")), any(), anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord()); // No snapshot file for online_seg01, so it's skipped. TableDataManager tableDataManager = mock(TableDataManager.class); + when(tableDataManager.getTableDataDir()).thenReturn(TABLE_DATA_DIR); + File seg01IdxDir = new File(TEMP_DIR, "online_seg01"); FileUtils.forceMkdir(seg01IdxDir); when(tableDataManager.getSegmentDataDir("online_seg01", null, tableConfig)).thenReturn(seg01IdxDir); @@ -189,7 +194,7 @@ public class ConcurrentMapTableUpsertMetadataManagerTest { when(tableDataManager.getSegmentDataDir("online_seg02", null, tableConfig)).thenReturn(seg02IdxDir); assertFalse(mgr.isPreloading()); - mgr.init(tableConfig, schema, tableDataManager, mock(ServerMetrics.class), helixManager, _segmentPreloadExecutor); + mgr.init(tableConfig, schema, tableDataManager, helixManager, _segmentPreloadExecutor); assertEquals(preloadedSegments.size(), 1); assertTrue(preloadedSegments.contains("online_seg02")); assertTrue(wasPreloading.get()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org