This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 83b0c14fda add metrics tracking lucene near real-time refresh delay (#13307) 83b0c14fda is described below commit 83b0c14fdaf60f785ea3e525c32581db1eaf3ed8 Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com> AuthorDate: Tue Jun 25 19:49:10 2024 -0700 add metrics tracking lucene near real-time refresh delay (#13307) * add metrics for lucene nrt delay * license * fix test, mock ServerMetrics * lint * missed mock metrics * missed cherry-pick * close ordering * address comment * use LLCSegmentName instead --- .../apache/pinot/common/metrics/ServerGauge.java | 2 + .../RealtimeLuceneIndexingDelayTracker.java | 211 +++++++++++++++++++++ .../RealtimeLuceneRefreshListener.java | 151 +++++++++++++++ .../invertedindex/RealtimeLuceneTextIndex.java | 8 + .../creator/impl/text/LuceneTextIndexCreator.java | 4 + .../converter/RealtimeSegmentConverterTest.java | 7 +- .../invertedindex/LuceneMutableTextIndexTest.java | 8 +- .../NativeAndLuceneMutableTextIndexTest.java | 7 +- .../RealtimeLuceneIndexingDelayTrackerTest.java | 118 ++++++++++++ .../RealtimeLuceneRefreshListenerTest.java | 202 ++++++++++++++++++++ 10 files changed, 713 insertions(+), 5 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java index 072c048c55..fb4eba3340 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java @@ -70,6 +70,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge { // Ingestion delay metrics REALTIME_INGESTION_DELAY_MS("milliseconds", false), END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false), + LUCENE_INDEXING_DELAY_MS("milliseconds", false), + LUCENE_INDEXING_DELAY_DOCS("documents", false), // Needed to track if valid doc id snapshots are present for faster restarts UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT("upsertValidDocIdSnapshotCount", false), UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount", false), diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTracker.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTracker.java new file mode 100644 index 0000000000..4f7a39265f --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTracker.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import org.apache.pinot.common.metrics.ServerGauge; +import org.apache.pinot.common.metrics.ServerMetrics; + + +/** + * RealtimeLuceneIndexingDelayTracker is used to track the last Lucene refresh time for each segment, + * and each partition of a table. + * <p> + * To reduce maximum metric cardinality, only the maximum delay for a partition is emitted. In practice, + * segment/column granularity is similar. + * <p> + * For example, if a table has 3 segments and 2 partitions, the metrics emitted will be: + * <p> - luceneIndexDelayMs.table1.partition0 + * <p> - luceneIndexDelayMs.table1.partition1 + * <p> - luceneIndexDelayDocs.table1.partition0 + * <p> - luceneIndexDelayDocs.table1.partition1 + * <p> where the values are the maximum indexDelayMs or indexDelayDocs across all segments for that partition + */ +public class RealtimeLuceneIndexingDelayTracker { + private final Map<String, TableDelay> _tableToPartitionToDelayMs; + // Lock is used to prevent removing a gauge while a new suppliers are being registered. Otherwise, it is possible + // for the gauge to be removed after ServerMetrics.setOrUpdatePartitionGauge() has been called for the next segment + private final ReentrantLock _lock = new ReentrantLock(); + + private RealtimeLuceneIndexingDelayTracker() { + _tableToPartitionToDelayMs = new HashMap<>(); + } + + /** + * Returns the singleton instance of the RealtimeLuceneIndexingDelayTracker. + */ + public static RealtimeLuceneIndexingDelayTracker getInstance() { + return SingletonHolder.INSTANCE; + } + + private static class SingletonHolder { + private static final RealtimeLuceneIndexingDelayTracker INSTANCE = new RealtimeLuceneIndexingDelayTracker(); + } + + public void registerDelaySuppliers(String tableName, String segmentName, String columnName, int partition, + Supplier<Integer> numDocsDelaySupplier, Supplier<Long> timeMsDelaySupplier) { + _lock.lock(); + try { + TableDelay tableDelay = _tableToPartitionToDelayMs.getOrDefault(tableName, new TableDelay(tableName)); + tableDelay.registerDelaySuppliers(segmentName, columnName, partition, numDocsDelaySupplier, timeMsDelaySupplier); + _tableToPartitionToDelayMs.put(tableName, tableDelay); + } finally { + _lock.unlock(); + } + } + + /** + * Clears the entry for a given table, segment, and partition. + * If the entry for the partition is empty, the gauge is removed. + */ + public void clear(String tableName, String segmentName, String columnName, int partition) { + _lock.lock(); + try { + TableDelay tableDelay = _tableToPartitionToDelayMs.get(tableName); + if (tableDelay != null) { + tableDelay.clearPartitionDelay(segmentName, columnName, partition); + if (tableDelay.isEmpty()) { + _tableToPartitionToDelayMs.remove(tableName); + } + } + } finally { + _lock.unlock(); + } + } + + /** + * Reset the tracker by removing all lastRefreshTimeMs data. + */ + @VisibleForTesting + public void reset() { + _tableToPartitionToDelayMs.clear(); + } + + /** + * TableDelay is used to track the latest Lucene refresh lastRefreshTimeMs for each partition, of a table. + */ + private static class TableDelay { + final String _tableName; + final Map<Integer, PartitionDelay> _partitionDelayMap; + final ServerMetrics _serverMetrics; + + TableDelay(String tableName) { + _tableName = tableName; + _partitionDelayMap = new HashMap<>(); + _serverMetrics = ServerMetrics.get(); + } + + void registerDelaySuppliers(String segmentName, String columnName, int partition, + Supplier<Integer> numDocsDelaySupplier, Supplier<Long> timeMsDelaySupplier) { + PartitionDelay partitionDelay = _partitionDelayMap.getOrDefault(partition, new PartitionDelay(partition)); + partitionDelay.registerDelaySuppliers(segmentName, columnName, numDocsDelaySupplier, timeMsDelaySupplier); + _partitionDelayMap.put(partition, partitionDelay); + updateMetrics(partitionDelay); + } + + /** + * Clears the entry for a given segment and partition. If the entry for the partition is empty, the + * gauge is removed. It is important for any stale gauge to be removed as the next segment may not be on the same + * server. + */ + void clearPartitionDelay(String segmentName, String columnName, int partition) { + PartitionDelay partitionDelay = _partitionDelayMap.get(partition); + if (partitionDelay != null) { + if (partitionDelay.numEntries() == 1) { + clearMetrics(partitionDelay); + _partitionDelayMap.remove(partition); + } + partitionDelay.clearSegmentDelay(segmentName, columnName); + } + } + + void updateMetrics(PartitionDelay partitionDelay) { + _serverMetrics.setOrUpdatePartitionGauge(_tableName, partitionDelay._partition, + ServerGauge.LUCENE_INDEXING_DELAY_MS, partitionDelay::getMaxTimeMsDelay); + _serverMetrics.setOrUpdatePartitionGauge(_tableName, partitionDelay._partition, + ServerGauge.LUCENE_INDEXING_DELAY_DOCS, partitionDelay::getMaxNumDocsDelay); + } + + void clearMetrics(PartitionDelay partitionDelay) { + _serverMetrics.removePartitionGauge(_tableName, partitionDelay._partition, ServerGauge.LUCENE_INDEXING_DELAY_MS); + _serverMetrics.removePartitionGauge(_tableName, partitionDelay._partition, + ServerGauge.LUCENE_INDEXING_DELAY_DOCS); + } + + boolean isEmpty() { + return _partitionDelayMap.isEmpty(); + } + + @Override + public String toString() { + return "TableDelay{_tableName=" + _tableName + "}"; + } + } + + /** + * PartitionDelay is used to track the latest Lucene refresh lastRefreshTimeMs for each segment, of a given partition. + */ + private static class PartitionDelay { + final int _partition; + final Map<String, Supplier<Integer>> _columnNumDocsDelaySuppliers; + final Map<String, Supplier<Long>> _columnTimeMsDelaySuppliers; + + PartitionDelay(int partition) { + _partition = partition; + _columnNumDocsDelaySuppliers = new HashMap<>(); + _columnTimeMsDelaySuppliers = new HashMap<>(); + } + + void registerDelaySuppliers(String segmentName, String columnName, Supplier<Integer> numDocsDelaySupplier, + Supplier<Long> timeMsDelaySupplier) { + _columnNumDocsDelaySuppliers.put(getKey(segmentName, columnName), numDocsDelaySupplier); + _columnTimeMsDelaySuppliers.put(getKey(segmentName, columnName), timeMsDelaySupplier); + } + + void clearSegmentDelay(String segmentName, String columnName) { + _columnNumDocsDelaySuppliers.remove(getKey(segmentName, columnName)); + _columnTimeMsDelaySuppliers.remove(getKey(segmentName, columnName)); + } + + long getMaxTimeMsDelay() { + return _columnTimeMsDelaySuppliers.values().stream().map(Supplier::get).max(Long::compareTo).orElse(-1L); + } + + long getMaxNumDocsDelay() { + return _columnNumDocsDelaySuppliers.values().stream().map(Supplier::get).max(Integer::compareTo).orElse(-1); + } + + int numEntries() { + return _columnNumDocsDelaySuppliers.size(); + } + + String getKey(String segmentName, String columnName) { + return segmentName + "." + columnName; + } + + @Override + public String toString() { + return "PartitionDelay{_partition=" + _partition + "}"; + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListener.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListener.java new file mode 100644 index 0000000000..4c8d5595ab --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListener.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Clock; +import java.util.function.Supplier; +import org.apache.lucene.search.ReferenceManager; + + +/** + * RealtimeLuceneRefreshListener is a listener that listens to the refresh of a Lucene index reader + * and updates the metrics for the delay between the last refresh time and the current time, as well as + * the number of documents that have been added since the last refresh. + * <p> + * RefreshListener.beforeRefresh() is called before a refresh attempt, and RefreshListener.afterRefresh() is called + * after the refresh attempt. If the lock cannot be acquired for a refresh, then neither method will be called. + * <p> + * Tracking can be used in the future to handoff between a small live index, and the current reference searcher. + */ +public class RealtimeLuceneRefreshListener implements ReferenceManager.RefreshListener { + + private final RealtimeLuceneIndexingDelayTracker _realtimeLuceneIndexingDelayTracker; + private final int _partition; + private final String _tableName; + private final String _segmentName; + private final String _columnName; + private final Supplier<Integer> _numDocsSupplier; + private final Supplier<Integer> _numDocsDelaySupplier; + private final Supplier<Long> _timeMsDelaySupplier; + private long _lastRefreshTimeMs; + private int _lastRefreshNumDocs; + private int _numDocsBeforeRefresh; + private Clock _clock; + + /** + * Create a new RealtimeLuceneRefreshListener with a clock. + * @param tableName Table name + * @param segmentName Segment name + * @param partition Partition number + * @param numDocsSupplier Supplier for retrieving the current of documents in the index + */ + public RealtimeLuceneRefreshListener(String tableName, String segmentName, String columnName, int partition, + Supplier<Integer> numDocsSupplier) { + this(tableName, segmentName, columnName, partition, numDocsSupplier, Clock.systemUTC()); + } + + /** + * Create a new RealtimeLuceneRefreshListener with a clock. Intended for testing. + * @param tableName Table name + * @param segmentName Segment name + * @param partition Partition number + * @param numDocsSupplier Supplier for retrieving the current of documents in the index + * @param clock Clock to use for time + */ + @VisibleForTesting + public RealtimeLuceneRefreshListener(String tableName, String segmentName, String columnName, int partition, + Supplier<Integer> numDocsSupplier, Clock clock) { + _partition = partition; + _tableName = tableName; + _segmentName = segmentName; + _columnName = columnName; + _numDocsSupplier = numDocsSupplier; + _clock = clock; + + // When the listener is first created, the reader is current + _lastRefreshTimeMs = _clock.millis(); + _lastRefreshNumDocs = _numDocsSupplier.get(); + + // Initialize delay suppliers + _numDocsDelaySupplier = initNumDocsDelaySupplier(); + _timeMsDelaySupplier = initTimeMsDelaySupplier(); + + // Register with RealtimeLuceneIndexingDelayTracker + _realtimeLuceneIndexingDelayTracker = RealtimeLuceneIndexingDelayTracker.getInstance(); + _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers(_tableName, _segmentName, _columnName, _partition, + _numDocsDelaySupplier, _timeMsDelaySupplier); + } + + @Override + public void beforeRefresh() { + // Record the lower bound of the number of docs that a refreshed searcher might see + _numDocsBeforeRefresh = _numDocsSupplier.get(); + } + + /** + * @param didRefresh true if the searcher reference was swapped to a new reference, otherwise false + */ + @Override + public void afterRefresh(boolean didRefresh) { + // Even if didRefresh is false, we should still update the last refresh time so that the delay is more accurate + if (didRefresh || _lastRefreshNumDocs == _numDocsBeforeRefresh) { + _lastRefreshTimeMs = _clock.millis(); + _lastRefreshNumDocs = _numDocsBeforeRefresh; + } + } + + private Supplier<Integer> initNumDocsDelaySupplier() { + return () -> _numDocsSupplier.get() - _lastRefreshNumDocs; + } + + private Supplier<Long> initTimeMsDelaySupplier() { + return () -> { + if (_numDocsDelaySupplier.get() == 0) { + // if numDocsDelay is zero, consider the reference refreshed + _lastRefreshTimeMs = _clock.millis(); + } + return _clock.millis() - _lastRefreshTimeMs; + }; + } + + public void close() { + _realtimeLuceneIndexingDelayTracker.clear(_tableName, _segmentName, _columnName, _partition); + } + + @VisibleForTesting + public Supplier<Integer> getNumDocsDelaySupplier() { + return _numDocsDelaySupplier; + } + + @VisibleForTesting + public Supplier<Long> getTimeMsDelaySupplier() { + return _timeMsDelaySupplier; + } + + @VisibleForTesting + public void setClock(Clock clock) { + _clock = clock; + } + + @VisibleForTesting + public Clock getClock() { + return _clock; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java index 751bff517a..d858655843 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java @@ -28,6 +28,7 @@ import org.apache.lucene.queryparser.classic.QueryParser; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; import org.apache.pinot.segment.local.utils.LuceneTextIndexUtils; @@ -55,6 +56,7 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { private final String _column; private final String _segmentName; private boolean _enablePrefixSuffixMatchingInPhraseQueries = false; + private final RealtimeLuceneRefreshListener _refreshListener; /** * Created by {@link MutableSegmentImpl} @@ -81,6 +83,11 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { false /* commitOnClose */, false, null, null, config); IndexWriter indexWriter = _indexCreator.getIndexWriter(); _searcherManager = new SearcherManager(indexWriter, false, false, null); + + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + _refreshListener = new RealtimeLuceneRefreshListener(llcSegmentName.getTableName(), segmentName, column, + llcSegmentName.getPartitionGroupId(), _indexCreator::getNumDocs); + _searcherManager.addListener(_refreshListener); _analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer(); _enablePrefixSuffixMatchingInPhraseQueries = config.isEnablePrefixSuffixMatchingInPhraseQueries(); } catch (Exception e) { @@ -197,6 +204,7 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { try { _searcherManager.close(); _searcherManager = null; + _refreshListener.close(); // clean up metrics prior to closing _indexCreator, as they contain a reference to it _indexCreator.close(); _analyzer.close(); } catch (Exception e) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java index 7a60e4b5c5..36b02b7d20 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java @@ -374,4 +374,8 @@ public class LuceneTextIndexCreator extends AbstractTextIndexCreator { String tmpSegmentName = indexDir.getParentFile().getName(); return tmpSegmentName.substring(tmpSegmentName.indexOf("tmp-") + 4, tmpSegmentName.lastIndexOf('-')); } + + public int getNumDocs() { + return _nextDocId; + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java index 033acc7dbf..c08fe41b0c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.segment.local.realtime.converter; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.io.File; @@ -33,6 +32,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager; @@ -65,9 +65,11 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -88,8 +90,9 @@ public class RealtimeSegmentConverterTest { private static final File TMP_DIR = new File(FileUtils.getTempDirectory(), RealtimeSegmentConverterTest.class.getName()); + @BeforeClass public void setup() { - Preconditions.checkState(TMP_DIR.mkdirs()); + ServerMetrics.register(mock(ServerMetrics.class)); } @Test diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java index 5d1a2440a5..efa00fe3bf 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.commons.io.FileUtils; import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.spi.index.TextIndexConfig; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -32,6 +33,7 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; @@ -43,6 +45,10 @@ public class LuceneMutableTextIndexTest { private RealtimeLuceneTextIndex _realtimeLuceneTextIndex; + public LuceneMutableTextIndexTest() { + ServerMetrics.register(mock(ServerMetrics.class)); + } + private String[][] getTextData() { return new String[][]{ {"realtime stream processing"}, {"publish subscribe", "columnar processing for data warehouses", "concurrency"} @@ -61,7 +67,7 @@ public class LuceneMutableTextIndexTest { TextIndexConfig config = new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false); _realtimeLuceneTextIndex = - new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "fooBar", config); + new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "table__0__1__20240602T0014Z", config); String[][] documents = getTextData(); String[][] repeatedDocuments = getRepeatedData(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java index ca1c94ceb8..6c1c664694 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java @@ -23,11 +23,13 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.spi.index.TextIndexConfig; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; @@ -71,14 +73,15 @@ public class NativeAndLuceneMutableTextIndexTest { @BeforeClass public void setUp() throws Exception { + ServerMetrics.register(mock(ServerMetrics.class)); TextIndexConfig config = new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false); _realtimeLuceneTextIndex = - new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "fooBar", config); + new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "table__0__1__20240602T0014Z", config); _nativeMutableTextIndex = new NativeMutableTextIndex(TEXT_COLUMN_NAME); _realtimeLuceneMVTextIndex = - new RealtimeLuceneTextIndex(MV_TEXT_COLUMN_NAME, INDEX_DIR, "fooBar", config); + new RealtimeLuceneTextIndex(MV_TEXT_COLUMN_NAME, INDEX_DIR, "table__0__1__20240602T0014Z", config); _nativeMutableMVTextIndex = new NativeMutableTextIndex(MV_TEXT_COLUMN_NAME); String[] documents = getTextData(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTrackerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTrackerTest.java new file mode 100644 index 0000000000..1b006bd955 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTrackerTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import java.util.function.Supplier; +import org.apache.pinot.common.metrics.ServerGauge; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; + + +public class RealtimeLuceneIndexingDelayTrackerTest { + private RealtimeLuceneIndexingDelayTracker _realtimeLuceneIndexingDelayTracker; + private ServerMetrics _serverMetrics; + + @BeforeMethod + public void setUp() { + _serverMetrics = mock(ServerMetrics.class); + ServerMetrics.deregister(); + ServerMetrics.register(_serverMetrics); + _realtimeLuceneIndexingDelayTracker = RealtimeLuceneIndexingDelayTracker.getInstance(); + _realtimeLuceneIndexingDelayTracker.reset(); + } + + @Test + public void testRegistersGaugesPerTable() { + _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", "segment1", "column1", 1, () -> 0, () -> 0L); + _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table2", "segment1", "column1", 1, () -> 0, () -> 0L); + + verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(1), eq(ServerGauge.LUCENE_INDEXING_DELAY_DOCS), + Mockito.any()); + verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(1), eq(ServerGauge.LUCENE_INDEXING_DELAY_MS), + Mockito.any()); + verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table2"), eq(1), eq(ServerGauge.LUCENE_INDEXING_DELAY_DOCS), + Mockito.any()); + verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table2"), eq(1), eq(ServerGauge.LUCENE_INDEXING_DELAY_MS), + Mockito.any()); + } + + @Test + public void testRegistersGaugesPerPartition() { + _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", "segment1", "column1", 1, () -> 0, () -> 0L); + _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", "segment1", "column1", 2, () -> 0, () -> 0L); + + verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(1), eq(ServerGauge.LUCENE_INDEXING_DELAY_DOCS), + Mockito.any()); + verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(1), eq(ServerGauge.LUCENE_INDEXING_DELAY_MS), + Mockito.any()); + verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(2), eq(ServerGauge.LUCENE_INDEXING_DELAY_DOCS), + Mockito.any()); + verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(2), eq(ServerGauge.LUCENE_INDEXING_DELAY_MS), + Mockito.any()); + } + + @Test + public void testCleansUpGauges() { + _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", "segment1", "column1", 1, () -> 0, () -> 0L); + _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", "segment2", "column1", 1, () -> 0, () -> 0L); + + // should not call remove if there are still other segments for the table and partition + _realtimeLuceneIndexingDelayTracker.clear("table1", "segment1", "column1", 1); + verify(_serverMetrics, times(0)).removePartitionGauge("table1", 1, ServerGauge.LUCENE_INDEXING_DELAY_MS); + verify(_serverMetrics, times(0)).removePartitionGauge("table1", 1, ServerGauge.LUCENE_INDEXING_DELAY_MS); + + _realtimeLuceneIndexingDelayTracker.clear("table1", "segment2", "column1", 1); + // if the last segment is removed, the partition gauge should be removed as well + verify(_serverMetrics).removePartitionGauge("table1", 1, ServerGauge.LUCENE_INDEXING_DELAY_DOCS); + verify(_serverMetrics).removePartitionGauge("table1", 1, ServerGauge.LUCENE_INDEXING_DELAY_MS); + } + + @Test + public void testEmitsMaxDelayPerPartition() { + _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", "segment1", "column1", 1, () -> 10, () -> 20L); + _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", "segment2", "column1", 1, () -> 5, () -> 15L); + _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table2", "segment1", "column1", 1, () -> 25, () -> 30L); + + verifyGaugeValue("table1", 1, ServerGauge.LUCENE_INDEXING_DELAY_DOCS, 10); + verifyGaugeValue("table1", 1, ServerGauge.LUCENE_INDEXING_DELAY_MS, 20); + verifyGaugeValue("table2", 1, ServerGauge.LUCENE_INDEXING_DELAY_DOCS, 25); + verifyGaugeValue("table2", 1, ServerGauge.LUCENE_INDEXING_DELAY_MS, 30); + } + + /** + * Helper method to verify the value of a gauge. If the gauge's supplier is updated multiple times, only + * the last value will be verified. + */ + private void verifyGaugeValue(String table, int partition, ServerGauge gauge, long expectedValue) { + ArgumentCaptor<Supplier<Long>> gaugeSupplierCaptor = ArgumentCaptor.forClass(Supplier.class); + verify(_serverMetrics, atLeastOnce()).setOrUpdatePartitionGauge(eq(table), eq(partition), eq(gauge), + gaugeSupplierCaptor.capture()); + assertEquals(gaugeSupplierCaptor.getValue().get(), expectedValue, "unexpected reported value for gauge"); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListenerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListenerTest.java new file mode 100644 index 0000000000..ea77f3e2c1 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListenerTest.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.function.Supplier; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; + + +public class RealtimeLuceneRefreshListenerTest { + + @BeforeClass + public void setUp() { + ServerMetrics.register(mock(ServerMetrics.class)); + } + + @Test + public void testRefreshTrue() { + MutableIntSupplier numDocsSupplier = new MutableIntSupplier(0); + Clock clock = Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault()); + RealtimeLuceneRefreshListener listener = + new RealtimeLuceneRefreshListener("table1", "segment1", "column1", 1, numDocsSupplier, clock); + Supplier<Integer> numDocsDelaySupplier = listener.getNumDocsDelaySupplier(); + Supplier<Long> timeMsDelaySupplier = listener.getTimeMsDelaySupplier(); + + // initiate state + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + + // time passes, docs indexed. expect up to date delays + incrementNumDocs(numDocsSupplier, 10); + incrementClock(listener, 10); + assertEquals(numDocsDelaySupplier.get(), 10); + assertEquals(timeMsDelaySupplier.get(), 10); + + // try refresh + listener.beforeRefresh(); + assertEquals(numDocsDelaySupplier.get(), 10); + assertEquals(timeMsDelaySupplier.get(), 10); + + // refresh success + incrementClock(listener, 10); + listener.afterRefresh(true); + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + } + + @Test + public void testRefreshFalse() { + MutableIntSupplier numDocsSupplier = new MutableIntSupplier(0); + Clock clock = Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault()); + RealtimeLuceneRefreshListener listener = + new RealtimeLuceneRefreshListener("table1", "segment1", "column1", 1, numDocsSupplier, clock); + Supplier<Integer> numDocsDelaySupplier = listener.getNumDocsDelaySupplier(); + Supplier<Long> timeMsDelaySupplier = listener.getTimeMsDelaySupplier(); + + // initiate state + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + + // time passes, docs indexed. expect up to date delays + incrementNumDocs(numDocsSupplier, 10); + incrementClock(listener, 10); + assertEquals(numDocsDelaySupplier.get(), 10); + assertEquals(timeMsDelaySupplier.get(), 10); + + // try refresh + listener.beforeRefresh(); + assertEquals(numDocsDelaySupplier.get(), 10); + assertEquals(timeMsDelaySupplier.get(), 10); + + // refresh false + incrementClock(listener, 10); + listener.afterRefresh(false); + assertEquals(numDocsDelaySupplier.get(), 10); + assertEquals(timeMsDelaySupplier.get(), 20); + } + + @Test + public void testRefreshFalseWithNoDocsAdded() { + MutableIntSupplier numDocsSupplier = new MutableIntSupplier(0); + Clock clock = Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault()); + RealtimeLuceneRefreshListener listener = + new RealtimeLuceneRefreshListener("table1", "segment1", "column1", 1, numDocsSupplier, clock); + Supplier<Integer> numDocsDelaySupplier = listener.getNumDocsDelaySupplier(); + Supplier<Long> timeMsDelaySupplier = listener.getTimeMsDelaySupplier(); + + // initiate state + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + + // time passes, no more docs indexed, there should be no delay + incrementClock(listener, 10); + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + + // try refresh + listener.beforeRefresh(); + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + + // refresh false + incrementClock(listener, 10); + listener.afterRefresh(false); + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + } + + @Test + public void testFirstRefresh() { + // index creator is initialized with a pause before docs are indexed, so we must ensure the first refresh + // does not report an excessive delay + MutableIntSupplier numDocsSupplier = new MutableIntSupplier(0); + Clock clock = Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault()); + RealtimeLuceneRefreshListener listener = + new RealtimeLuceneRefreshListener("table1", "segment1", "column1", 1, numDocsSupplier, clock); + Supplier<Integer> numDocsDelaySupplier = listener.getNumDocsDelaySupplier(); + Supplier<Long> timeMsDelaySupplier = listener.getTimeMsDelaySupplier(); + + // initiate state + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + + // time passes, no more docs indexed, there should be no delay + incrementClock(listener, 10); + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + + // time passes, no more docs indexed, there should be no delay + incrementClock(listener, 10); + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + + // more time passes, initial docs indexed after time has passed, therefore delay should be zero + incrementNumDocs(numDocsSupplier, 10); + assertEquals(numDocsDelaySupplier.get(), 10); + assertEquals(timeMsDelaySupplier.get(), 0); + + // try refresh + listener.beforeRefresh(); + incrementClock(listener, 10); + assertEquals(numDocsDelaySupplier.get(), 10); + assertEquals(timeMsDelaySupplier.get(), 10); + + // refresh true + incrementClock(listener, 10); + listener.afterRefresh(true); + assertEquals(numDocsDelaySupplier.get(), 0); + assertEquals(timeMsDelaySupplier.get(), 0); + } + + private void incrementClock(RealtimeLuceneRefreshListener listener, long millis) { + Clock offsetClock = Clock.offset(listener.getClock(), Duration.ofMillis(millis)); + listener.setClock(offsetClock); + } + + private void incrementNumDocs(MutableIntSupplier mutableIntSupplier, int docs) { + mutableIntSupplier.increment(docs); + } + + // Helper for simulating increasing doc count + private static class MutableIntSupplier implements Supplier<Integer> { + private int _value; + + public MutableIntSupplier(int initialValue) { + _value = initialValue; + } + + @Override + public Integer get() { + return _value; + } + + public void increment(int i) { + _value += i; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org