This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 83b0c14fda add metrics tracking lucene near real-time refresh delay 
(#13307)
83b0c14fda is described below

commit 83b0c14fdaf60f785ea3e525c32581db1eaf3ed8
Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com>
AuthorDate: Tue Jun 25 19:49:10 2024 -0700

    add metrics tracking lucene near real-time refresh delay (#13307)
    
    * add metrics for lucene nrt delay
    
    * license
    
    * fix test, mock ServerMetrics
    
    * lint
    
    * missed mock metrics
    
    * missed cherry-pick
    
    * close ordering
    
    * address comment
    
    * use LLCSegmentName instead
---
 .../apache/pinot/common/metrics/ServerGauge.java   |   2 +
 .../RealtimeLuceneIndexingDelayTracker.java        | 211 +++++++++++++++++++++
 .../RealtimeLuceneRefreshListener.java             | 151 +++++++++++++++
 .../invertedindex/RealtimeLuceneTextIndex.java     |   8 +
 .../creator/impl/text/LuceneTextIndexCreator.java  |   4 +
 .../converter/RealtimeSegmentConverterTest.java    |   7 +-
 .../invertedindex/LuceneMutableTextIndexTest.java  |   8 +-
 .../NativeAndLuceneMutableTextIndexTest.java       |   7 +-
 .../RealtimeLuceneIndexingDelayTrackerTest.java    | 118 ++++++++++++
 .../RealtimeLuceneRefreshListenerTest.java         | 202 ++++++++++++++++++++
 10 files changed, 713 insertions(+), 5 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 072c048c55..fb4eba3340 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -70,6 +70,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   // Ingestion delay metrics
   REALTIME_INGESTION_DELAY_MS("milliseconds", false),
   END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false),
+  LUCENE_INDEXING_DELAY_MS("milliseconds", false),
+  LUCENE_INDEXING_DELAY_DOCS("documents", false),
   // Needed to track if valid doc id snapshots are present for faster restarts
   UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT("upsertValidDocIdSnapshotCount", false),
   UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount", 
false),
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTracker.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTracker.java
new file mode 100644
index 0000000000..4f7a39265f
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTracker.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+
+
+/**
+ * RealtimeLuceneIndexingDelayTracker is used to track the last Lucene refresh 
time for each segment,
+ * and each partition of a table.
+ * <p>
+ * To reduce maximum metric cardinality, only the maximum delay for a 
partition is emitted. In practice,
+ * segment/column granularity is similar.
+ * <p>
+ * For example, if a table has 3 segments and 2 partitions, the metrics 
emitted will be:
+ * <p> - luceneIndexDelayMs.table1.partition0
+ * <p> - luceneIndexDelayMs.table1.partition1
+ * <p> - luceneIndexDelayDocs.table1.partition0
+ * <p> - luceneIndexDelayDocs.table1.partition1
+ * <p> where the values are the maximum indexDelayMs or indexDelayDocs across 
all segments for that partition
+ */
+public class RealtimeLuceneIndexingDelayTracker {
+  private final Map<String, TableDelay> _tableToPartitionToDelayMs;
+  // Lock is used to prevent removing a gauge while a new suppliers are being 
registered. Otherwise, it is possible
+  // for the gauge to be removed after 
ServerMetrics.setOrUpdatePartitionGauge() has been called for the next segment
+  private final ReentrantLock _lock = new ReentrantLock();
+
+  private RealtimeLuceneIndexingDelayTracker() {
+    _tableToPartitionToDelayMs = new HashMap<>();
+  }
+
+  /**
+   * Returns the singleton instance of the RealtimeLuceneIndexingDelayTracker.
+   */
+  public static RealtimeLuceneIndexingDelayTracker getInstance() {
+    return SingletonHolder.INSTANCE;
+  }
+
+  private static class SingletonHolder {
+    private static final RealtimeLuceneIndexingDelayTracker INSTANCE = new 
RealtimeLuceneIndexingDelayTracker();
+  }
+
+  public void registerDelaySuppliers(String tableName, String segmentName, 
String columnName, int partition,
+      Supplier<Integer> numDocsDelaySupplier, Supplier<Long> 
timeMsDelaySupplier) {
+    _lock.lock();
+    try {
+      TableDelay tableDelay = 
_tableToPartitionToDelayMs.getOrDefault(tableName, new TableDelay(tableName));
+      tableDelay.registerDelaySuppliers(segmentName, columnName, partition, 
numDocsDelaySupplier, timeMsDelaySupplier);
+      _tableToPartitionToDelayMs.put(tableName, tableDelay);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Clears the entry for a given table, segment, and partition.
+   * If the entry for the partition is empty, the gauge is removed.
+   */
+  public void clear(String tableName, String segmentName, String columnName, 
int partition) {
+    _lock.lock();
+    try {
+      TableDelay tableDelay = _tableToPartitionToDelayMs.get(tableName);
+      if (tableDelay != null) {
+        tableDelay.clearPartitionDelay(segmentName, columnName, partition);
+        if (tableDelay.isEmpty()) {
+          _tableToPartitionToDelayMs.remove(tableName);
+        }
+      }
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Reset the tracker by removing all lastRefreshTimeMs data.
+   */
+  @VisibleForTesting
+  public void reset() {
+    _tableToPartitionToDelayMs.clear();
+  }
+
+  /**
+   * TableDelay is used to track the latest Lucene refresh lastRefreshTimeMs 
for each partition, of a table.
+   */
+  private static class TableDelay {
+    final String _tableName;
+    final Map<Integer, PartitionDelay> _partitionDelayMap;
+    final ServerMetrics _serverMetrics;
+
+    TableDelay(String tableName) {
+      _tableName = tableName;
+      _partitionDelayMap = new HashMap<>();
+      _serverMetrics = ServerMetrics.get();
+    }
+
+    void registerDelaySuppliers(String segmentName, String columnName, int 
partition,
+        Supplier<Integer> numDocsDelaySupplier, Supplier<Long> 
timeMsDelaySupplier) {
+      PartitionDelay partitionDelay = 
_partitionDelayMap.getOrDefault(partition, new PartitionDelay(partition));
+      partitionDelay.registerDelaySuppliers(segmentName, columnName, 
numDocsDelaySupplier, timeMsDelaySupplier);
+      _partitionDelayMap.put(partition, partitionDelay);
+      updateMetrics(partitionDelay);
+    }
+
+    /**
+     * Clears the entry for a given segment and partition. If the entry for 
the partition is empty, the
+     * gauge is removed. It is important for any stale gauge to be removed as 
the next segment may not be on the same
+     * server.
+     */
+    void clearPartitionDelay(String segmentName, String columnName, int 
partition) {
+      PartitionDelay partitionDelay = _partitionDelayMap.get(partition);
+      if (partitionDelay != null) {
+        if (partitionDelay.numEntries() == 1) {
+          clearMetrics(partitionDelay);
+          _partitionDelayMap.remove(partition);
+        }
+        partitionDelay.clearSegmentDelay(segmentName, columnName);
+      }
+    }
+
+    void updateMetrics(PartitionDelay partitionDelay) {
+      _serverMetrics.setOrUpdatePartitionGauge(_tableName, 
partitionDelay._partition,
+          ServerGauge.LUCENE_INDEXING_DELAY_MS, 
partitionDelay::getMaxTimeMsDelay);
+      _serverMetrics.setOrUpdatePartitionGauge(_tableName, 
partitionDelay._partition,
+          ServerGauge.LUCENE_INDEXING_DELAY_DOCS, 
partitionDelay::getMaxNumDocsDelay);
+    }
+
+    void clearMetrics(PartitionDelay partitionDelay) {
+      _serverMetrics.removePartitionGauge(_tableName, 
partitionDelay._partition, ServerGauge.LUCENE_INDEXING_DELAY_MS);
+      _serverMetrics.removePartitionGauge(_tableName, 
partitionDelay._partition,
+          ServerGauge.LUCENE_INDEXING_DELAY_DOCS);
+    }
+
+    boolean isEmpty() {
+      return _partitionDelayMap.isEmpty();
+    }
+
+    @Override
+    public String toString() {
+      return "TableDelay{_tableName=" + _tableName + "}";
+    }
+  }
+
+  /**
+   * PartitionDelay is used to track the latest Lucene refresh 
lastRefreshTimeMs for each segment, of a given partition.
+   */
+  private static class PartitionDelay {
+    final int _partition;
+    final Map<String, Supplier<Integer>> _columnNumDocsDelaySuppliers;
+    final Map<String, Supplier<Long>> _columnTimeMsDelaySuppliers;
+
+    PartitionDelay(int partition) {
+      _partition = partition;
+      _columnNumDocsDelaySuppliers = new HashMap<>();
+      _columnTimeMsDelaySuppliers = new HashMap<>();
+    }
+
+    void registerDelaySuppliers(String segmentName, String columnName, 
Supplier<Integer> numDocsDelaySupplier,
+        Supplier<Long> timeMsDelaySupplier) {
+      _columnNumDocsDelaySuppliers.put(getKey(segmentName, columnName), 
numDocsDelaySupplier);
+      _columnTimeMsDelaySuppliers.put(getKey(segmentName, columnName), 
timeMsDelaySupplier);
+    }
+
+    void clearSegmentDelay(String segmentName, String columnName) {
+      _columnNumDocsDelaySuppliers.remove(getKey(segmentName, columnName));
+      _columnTimeMsDelaySuppliers.remove(getKey(segmentName, columnName));
+    }
+
+    long getMaxTimeMsDelay() {
+      return 
_columnTimeMsDelaySuppliers.values().stream().map(Supplier::get).max(Long::compareTo).orElse(-1L);
+    }
+
+    long getMaxNumDocsDelay() {
+      return 
_columnNumDocsDelaySuppliers.values().stream().map(Supplier::get).max(Integer::compareTo).orElse(-1);
+    }
+
+    int numEntries() {
+      return _columnNumDocsDelaySuppliers.size();
+    }
+
+    String getKey(String segmentName, String columnName) {
+      return segmentName + "." + columnName;
+    }
+
+    @Override
+    public String toString() {
+      return "PartitionDelay{_partition=" + _partition + "}";
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListener.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListener.java
new file mode 100644
index 0000000000..4c8d5595ab
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListener.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Clock;
+import java.util.function.Supplier;
+import org.apache.lucene.search.ReferenceManager;
+
+
+/**
+ * RealtimeLuceneRefreshListener is a listener that listens to the refresh of 
a Lucene index reader
+ * and updates the metrics for the delay between the last refresh time and the 
current time, as well as
+ * the number of documents that have been added since the last refresh.
+ * <p>
+ * RefreshListener.beforeRefresh() is called before a refresh attempt, and 
RefreshListener.afterRefresh() is called
+ * after the refresh attempt. If the lock cannot be acquired for a refresh, 
then neither method will be called.
+ * <p>
+ * Tracking can be used in the future to handoff between a small live index, 
and the current reference searcher.
+ */
+public class RealtimeLuceneRefreshListener implements 
ReferenceManager.RefreshListener {
+
+  private final RealtimeLuceneIndexingDelayTracker 
_realtimeLuceneIndexingDelayTracker;
+  private final int _partition;
+  private final String _tableName;
+  private final String _segmentName;
+  private final String _columnName;
+  private final Supplier<Integer> _numDocsSupplier;
+  private final Supplier<Integer> _numDocsDelaySupplier;
+  private final Supplier<Long> _timeMsDelaySupplier;
+  private long _lastRefreshTimeMs;
+  private int _lastRefreshNumDocs;
+  private int _numDocsBeforeRefresh;
+  private Clock _clock;
+
+  /**
+   * Create a new RealtimeLuceneRefreshListener with a clock.
+   * @param tableName Table name
+   * @param segmentName Segment name
+   * @param partition Partition number
+   * @param numDocsSupplier Supplier for retrieving the current of documents 
in the index
+   */
+  public RealtimeLuceneRefreshListener(String tableName, String segmentName, 
String columnName, int partition,
+      Supplier<Integer> numDocsSupplier) {
+    this(tableName, segmentName, columnName, partition, numDocsSupplier, 
Clock.systemUTC());
+  }
+
+  /**
+   * Create a new RealtimeLuceneRefreshListener with a clock. Intended for 
testing.
+   * @param tableName Table name
+   * @param segmentName Segment name
+   * @param partition Partition number
+   * @param numDocsSupplier Supplier for retrieving the current of documents 
in the index
+   * @param clock Clock to use for time
+   */
+  @VisibleForTesting
+  public RealtimeLuceneRefreshListener(String tableName, String segmentName, 
String columnName, int partition,
+      Supplier<Integer> numDocsSupplier, Clock clock) {
+    _partition = partition;
+    _tableName = tableName;
+    _segmentName = segmentName;
+    _columnName = columnName;
+    _numDocsSupplier = numDocsSupplier;
+    _clock = clock;
+
+    // When the listener is first created, the reader is current
+    _lastRefreshTimeMs = _clock.millis();
+    _lastRefreshNumDocs = _numDocsSupplier.get();
+
+    // Initialize delay suppliers
+    _numDocsDelaySupplier = initNumDocsDelaySupplier();
+    _timeMsDelaySupplier = initTimeMsDelaySupplier();
+
+    // Register with RealtimeLuceneIndexingDelayTracker
+    _realtimeLuceneIndexingDelayTracker = 
RealtimeLuceneIndexingDelayTracker.getInstance();
+    _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers(_tableName, 
_segmentName, _columnName, _partition,
+        _numDocsDelaySupplier, _timeMsDelaySupplier);
+  }
+
+  @Override
+  public void beforeRefresh() {
+    // Record the lower bound of the number of docs that a refreshed searcher 
might see
+    _numDocsBeforeRefresh = _numDocsSupplier.get();
+  }
+
+  /**
+   * @param didRefresh true if the searcher reference was swapped to a new 
reference, otherwise false
+   */
+  @Override
+  public void afterRefresh(boolean didRefresh) {
+    // Even if didRefresh is false, we should still update the last refresh 
time so that the delay is more accurate
+    if (didRefresh || _lastRefreshNumDocs == _numDocsBeforeRefresh) {
+      _lastRefreshTimeMs = _clock.millis();
+      _lastRefreshNumDocs = _numDocsBeforeRefresh;
+    }
+  }
+
+  private Supplier<Integer> initNumDocsDelaySupplier() {
+    return () -> _numDocsSupplier.get() - _lastRefreshNumDocs;
+  }
+
+  private Supplier<Long> initTimeMsDelaySupplier() {
+    return () -> {
+      if (_numDocsDelaySupplier.get() == 0) {
+        // if numDocsDelay is zero, consider the reference refreshed
+        _lastRefreshTimeMs = _clock.millis();
+      }
+      return _clock.millis() - _lastRefreshTimeMs;
+    };
+  }
+
+  public void close() {
+    _realtimeLuceneIndexingDelayTracker.clear(_tableName, _segmentName, 
_columnName, _partition);
+  }
+
+  @VisibleForTesting
+  public Supplier<Integer> getNumDocsDelaySupplier() {
+    return _numDocsDelaySupplier;
+  }
+
+  @VisibleForTesting
+  public Supplier<Long> getTimeMsDelaySupplier() {
+    return _timeMsDelaySupplier;
+  }
+
+  @VisibleForTesting
+  public void setClock(Clock clock) {
+    _clock = clock;
+  }
+
+  @VisibleForTesting
+  public Clock getClock() {
+    return _clock;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
index 751bff517a..d858655843 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
@@ -28,6 +28,7 @@ import org.apache.lucene.queryparser.classic.QueryParser;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SearcherManager;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import 
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
 import org.apache.pinot.segment.local.utils.LuceneTextIndexUtils;
@@ -55,6 +56,7 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
   private final String _column;
   private final String _segmentName;
   private boolean _enablePrefixSuffixMatchingInPhraseQueries = false;
+  private final RealtimeLuceneRefreshListener _refreshListener;
 
   /**
    * Created by {@link MutableSegmentImpl}
@@ -81,6 +83,11 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
               false /* commitOnClose */, false, null, null, config);
       IndexWriter indexWriter = _indexCreator.getIndexWriter();
       _searcherManager = new SearcherManager(indexWriter, false, false, null);
+
+      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+      _refreshListener = new 
RealtimeLuceneRefreshListener(llcSegmentName.getTableName(), segmentName, 
column,
+          llcSegmentName.getPartitionGroupId(), _indexCreator::getNumDocs);
+      _searcherManager.addListener(_refreshListener);
       _analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer();
       _enablePrefixSuffixMatchingInPhraseQueries = 
config.isEnablePrefixSuffixMatchingInPhraseQueries();
     } catch (Exception e) {
@@ -197,6 +204,7 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
     try {
       _searcherManager.close();
       _searcherManager = null;
+      _refreshListener.close(); // clean up metrics prior to closing 
_indexCreator, as they contain a reference to it
       _indexCreator.close();
       _analyzer.close();
     } catch (Exception e) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
index 7a60e4b5c5..36b02b7d20 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
@@ -374,4 +374,8 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
     String tmpSegmentName = indexDir.getParentFile().getName();
     return tmpSegmentName.substring(tmpSegmentName.indexOf("tmp-") + 4, 
tmpSegmentName.lastIndexOf('-'));
   }
+
+  public int getNumDocs() {
+    return _nextDocId;
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
index 033acc7dbf..c08fe41b0c 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.segment.local.realtime.converter;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.File;
@@ -33,6 +32,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
@@ -65,9 +65,11 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
@@ -88,8 +90,9 @@ public class RealtimeSegmentConverterTest {
   private static final File TMP_DIR =
       new File(FileUtils.getTempDirectory(), 
RealtimeSegmentConverterTest.class.getName());
 
+  @BeforeClass
   public void setup() {
-    Preconditions.checkState(TMP_DIR.mkdirs());
+    ServerMetrics.register(mock(ServerMetrics.class));
   }
 
   @Test
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
index 5d1a2440a5..efa00fe3bf 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import org.apache.commons.io.FileUtils;
 import org.apache.lucene.search.SearcherManager;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.spi.index.TextIndexConfig;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -32,6 +33,7 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 
 
@@ -43,6 +45,10 @@ public class LuceneMutableTextIndexTest {
 
   private RealtimeLuceneTextIndex _realtimeLuceneTextIndex;
 
+  public LuceneMutableTextIndexTest() {
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+
   private String[][] getTextData() {
     return new String[][]{
         {"realtime stream processing"}, {"publish subscribe", "columnar 
processing for data warehouses", "concurrency"}
@@ -61,7 +67,7 @@ public class LuceneMutableTextIndexTest {
     TextIndexConfig config =
             new TextIndexConfig(false, null, null, false, false, null, null, 
true, 500, null, false);
     _realtimeLuceneTextIndex =
-        new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "fooBar", 
config);
+        new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, 
"table__0__1__20240602T0014Z", config);
     String[][] documents = getTextData();
     String[][] repeatedDocuments = getRepeatedData();
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
index ca1c94ceb8..6c1c664694 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
@@ -23,11 +23,13 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.lucene.search.SearcherManager;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.spi.index.TextIndexConfig;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 
 
@@ -71,14 +73,15 @@ public class NativeAndLuceneMutableTextIndexTest {
   @BeforeClass
   public void setUp()
       throws Exception {
+    ServerMetrics.register(mock(ServerMetrics.class));
     TextIndexConfig config =
         new TextIndexConfig(false, null, null, false, false, null, null, true, 
500, null, false);
     _realtimeLuceneTextIndex =
-        new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "fooBar", 
config);
+        new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, 
"table__0__1__20240602T0014Z", config);
     _nativeMutableTextIndex = new NativeMutableTextIndex(TEXT_COLUMN_NAME);
 
     _realtimeLuceneMVTextIndex =
-        new RealtimeLuceneTextIndex(MV_TEXT_COLUMN_NAME, INDEX_DIR, "fooBar", 
config);
+        new RealtimeLuceneTextIndex(MV_TEXT_COLUMN_NAME, INDEX_DIR, 
"table__0__1__20240602T0014Z", config);
     _nativeMutableMVTextIndex = new 
NativeMutableTextIndex(MV_TEXT_COLUMN_NAME);
 
     String[] documents = getTextData();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTrackerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTrackerTest.java
new file mode 100644
index 0000000000..1b006bd955
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexingDelayTrackerTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.util.function.Supplier;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+
+
+public class RealtimeLuceneIndexingDelayTrackerTest {
+  private RealtimeLuceneIndexingDelayTracker 
_realtimeLuceneIndexingDelayTracker;
+  private ServerMetrics _serverMetrics;
+
+  @BeforeMethod
+  public void setUp() {
+    _serverMetrics = mock(ServerMetrics.class);
+    ServerMetrics.deregister();
+    ServerMetrics.register(_serverMetrics);
+    _realtimeLuceneIndexingDelayTracker = 
RealtimeLuceneIndexingDelayTracker.getInstance();
+    _realtimeLuceneIndexingDelayTracker.reset();
+  }
+
+  @Test
+  public void testRegistersGaugesPerTable() {
+    _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", 
"segment1", "column1", 1, () -> 0, () -> 0L);
+    _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table2", 
"segment1", "column1", 1, () -> 0, () -> 0L);
+
+    verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(1), 
eq(ServerGauge.LUCENE_INDEXING_DELAY_DOCS),
+        Mockito.any());
+    verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(1), 
eq(ServerGauge.LUCENE_INDEXING_DELAY_MS),
+        Mockito.any());
+    verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table2"), eq(1), 
eq(ServerGauge.LUCENE_INDEXING_DELAY_DOCS),
+        Mockito.any());
+    verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table2"), eq(1), 
eq(ServerGauge.LUCENE_INDEXING_DELAY_MS),
+        Mockito.any());
+  }
+
+  @Test
+  public void testRegistersGaugesPerPartition() {
+    _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", 
"segment1", "column1", 1, () -> 0, () -> 0L);
+    _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", 
"segment1", "column1", 2, () -> 0, () -> 0L);
+
+    verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(1), 
eq(ServerGauge.LUCENE_INDEXING_DELAY_DOCS),
+        Mockito.any());
+    verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(1), 
eq(ServerGauge.LUCENE_INDEXING_DELAY_MS),
+        Mockito.any());
+    verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(2), 
eq(ServerGauge.LUCENE_INDEXING_DELAY_DOCS),
+        Mockito.any());
+    verify(_serverMetrics).setOrUpdatePartitionGauge(eq("table1"), eq(2), 
eq(ServerGauge.LUCENE_INDEXING_DELAY_MS),
+        Mockito.any());
+  }
+
+  @Test
+  public void testCleansUpGauges() {
+    _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", 
"segment1", "column1", 1, () -> 0, () -> 0L);
+    _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", 
"segment2", "column1", 1, () -> 0, () -> 0L);
+
+    // should not call remove if there are still other segments for the table 
and partition
+    _realtimeLuceneIndexingDelayTracker.clear("table1", "segment1", "column1", 
1);
+    verify(_serverMetrics, times(0)).removePartitionGauge("table1", 1, 
ServerGauge.LUCENE_INDEXING_DELAY_MS);
+    verify(_serverMetrics, times(0)).removePartitionGauge("table1", 1, 
ServerGauge.LUCENE_INDEXING_DELAY_MS);
+
+    _realtimeLuceneIndexingDelayTracker.clear("table1", "segment2", "column1", 
1);
+    // if the last segment is removed, the partition gauge should be removed 
as well
+    verify(_serverMetrics).removePartitionGauge("table1", 1, 
ServerGauge.LUCENE_INDEXING_DELAY_DOCS);
+    verify(_serverMetrics).removePartitionGauge("table1", 1, 
ServerGauge.LUCENE_INDEXING_DELAY_MS);
+  }
+
+  @Test
+  public void testEmitsMaxDelayPerPartition() {
+    _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", 
"segment1", "column1", 1, () -> 10, () -> 20L);
+    _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table1", 
"segment2", "column1", 1, () -> 5, () -> 15L);
+    _realtimeLuceneIndexingDelayTracker.registerDelaySuppliers("table2", 
"segment1", "column1", 1, () -> 25, () -> 30L);
+
+    verifyGaugeValue("table1", 1, ServerGauge.LUCENE_INDEXING_DELAY_DOCS, 10);
+    verifyGaugeValue("table1", 1, ServerGauge.LUCENE_INDEXING_DELAY_MS, 20);
+    verifyGaugeValue("table2", 1, ServerGauge.LUCENE_INDEXING_DELAY_DOCS, 25);
+    verifyGaugeValue("table2", 1, ServerGauge.LUCENE_INDEXING_DELAY_MS, 30);
+  }
+
+  /**
+   * Helper method to verify the value of a gauge. If the gauge's supplier is 
updated multiple times, only
+   * the last value will be verified.
+   */
+  private void verifyGaugeValue(String table, int partition, ServerGauge 
gauge, long expectedValue) {
+    ArgumentCaptor<Supplier<Long>> gaugeSupplierCaptor = 
ArgumentCaptor.forClass(Supplier.class);
+    verify(_serverMetrics, atLeastOnce()).setOrUpdatePartitionGauge(eq(table), 
eq(partition), eq(gauge),
+        gaugeSupplierCaptor.capture());
+    assertEquals(gaugeSupplierCaptor.getValue().get(), expectedValue, 
"unexpected reported value for gauge");
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListenerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListenerTest.java
new file mode 100644
index 0000000000..ea77f3e2c1
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneRefreshListenerTest.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.function.Supplier;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+
+
+public class RealtimeLuceneRefreshListenerTest {
+
+  @BeforeClass
+  public void setUp() {
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+
+  @Test
+  public void testRefreshTrue() {
+    MutableIntSupplier numDocsSupplier = new MutableIntSupplier(0);
+    Clock clock = Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault());
+    RealtimeLuceneRefreshListener listener =
+        new RealtimeLuceneRefreshListener("table1", "segment1", "column1", 1, 
numDocsSupplier, clock);
+    Supplier<Integer> numDocsDelaySupplier = 
listener.getNumDocsDelaySupplier();
+    Supplier<Long> timeMsDelaySupplier = listener.getTimeMsDelaySupplier();
+
+    // initiate state
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+
+    // time passes, docs indexed. expect up to date delays
+    incrementNumDocs(numDocsSupplier, 10);
+    incrementClock(listener, 10);
+    assertEquals(numDocsDelaySupplier.get(), 10);
+    assertEquals(timeMsDelaySupplier.get(), 10);
+
+    // try refresh
+    listener.beforeRefresh();
+    assertEquals(numDocsDelaySupplier.get(), 10);
+    assertEquals(timeMsDelaySupplier.get(), 10);
+
+    // refresh success
+    incrementClock(listener, 10);
+    listener.afterRefresh(true);
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+  }
+
+  @Test
+  public void testRefreshFalse() {
+    MutableIntSupplier numDocsSupplier = new MutableIntSupplier(0);
+    Clock clock = Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault());
+    RealtimeLuceneRefreshListener listener =
+        new RealtimeLuceneRefreshListener("table1", "segment1", "column1", 1, 
numDocsSupplier, clock);
+    Supplier<Integer> numDocsDelaySupplier = 
listener.getNumDocsDelaySupplier();
+    Supplier<Long> timeMsDelaySupplier = listener.getTimeMsDelaySupplier();
+
+    // initiate state
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+
+    // time passes, docs indexed. expect up to date delays
+    incrementNumDocs(numDocsSupplier, 10);
+    incrementClock(listener, 10);
+    assertEquals(numDocsDelaySupplier.get(), 10);
+    assertEquals(timeMsDelaySupplier.get(), 10);
+
+    // try refresh
+    listener.beforeRefresh();
+    assertEquals(numDocsDelaySupplier.get(), 10);
+    assertEquals(timeMsDelaySupplier.get(), 10);
+
+    // refresh false
+    incrementClock(listener, 10);
+    listener.afterRefresh(false);
+    assertEquals(numDocsDelaySupplier.get(), 10);
+    assertEquals(timeMsDelaySupplier.get(), 20);
+  }
+
+  @Test
+  public void testRefreshFalseWithNoDocsAdded() {
+    MutableIntSupplier numDocsSupplier = new MutableIntSupplier(0);
+    Clock clock = Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault());
+    RealtimeLuceneRefreshListener listener =
+        new RealtimeLuceneRefreshListener("table1", "segment1", "column1", 1, 
numDocsSupplier, clock);
+    Supplier<Integer> numDocsDelaySupplier = 
listener.getNumDocsDelaySupplier();
+    Supplier<Long> timeMsDelaySupplier = listener.getTimeMsDelaySupplier();
+
+    // initiate state
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+
+    // time passes, no more docs indexed, there should be no delay
+    incrementClock(listener, 10);
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+
+    // try refresh
+    listener.beforeRefresh();
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+
+    // refresh false
+    incrementClock(listener, 10);
+    listener.afterRefresh(false);
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+  }
+
+  @Test
+  public void testFirstRefresh() {
+    // index creator is initialized with a pause before docs are indexed, so 
we must ensure the first refresh
+    // does not report an excessive delay
+    MutableIntSupplier numDocsSupplier = new MutableIntSupplier(0);
+    Clock clock = Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault());
+    RealtimeLuceneRefreshListener listener =
+        new RealtimeLuceneRefreshListener("table1", "segment1", "column1", 1, 
numDocsSupplier, clock);
+    Supplier<Integer> numDocsDelaySupplier = 
listener.getNumDocsDelaySupplier();
+    Supplier<Long> timeMsDelaySupplier = listener.getTimeMsDelaySupplier();
+
+    // initiate state
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+
+    // time passes, no more docs indexed, there should be no delay
+    incrementClock(listener, 10);
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+
+    // time passes, no more docs indexed, there should be no delay
+    incrementClock(listener, 10);
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+
+    // more time passes, initial docs indexed after time has passed, therefore 
delay should be zero
+    incrementNumDocs(numDocsSupplier, 10);
+    assertEquals(numDocsDelaySupplier.get(), 10);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+
+    // try refresh
+    listener.beforeRefresh();
+    incrementClock(listener, 10);
+    assertEquals(numDocsDelaySupplier.get(), 10);
+    assertEquals(timeMsDelaySupplier.get(), 10);
+
+    // refresh true
+    incrementClock(listener, 10);
+    listener.afterRefresh(true);
+    assertEquals(numDocsDelaySupplier.get(), 0);
+    assertEquals(timeMsDelaySupplier.get(), 0);
+  }
+
+  private void incrementClock(RealtimeLuceneRefreshListener listener, long 
millis) {
+    Clock offsetClock = Clock.offset(listener.getClock(), 
Duration.ofMillis(millis));
+    listener.setClock(offsetClock);
+  }
+
+  private void incrementNumDocs(MutableIntSupplier mutableIntSupplier, int 
docs) {
+    mutableIntSupplier.increment(docs);
+  }
+
+  // Helper for simulating increasing doc count
+  private static class MutableIntSupplier implements Supplier<Integer> {
+    private int _value;
+
+    public MutableIntSupplier(int initialValue) {
+      _value = initialValue;
+    }
+
+    @Override
+    public Integer get() {
+      return _value;
+    }
+
+    public void increment(int i) {
+      _value += i;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to