This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a23c610309b Fixes all race conditions from stream consumer (#17089)
a23c610309b is described below
commit a23c610309b506afe8342ec257cfaea0dfd7e87b
Author: NOOB <[email protected]>
AuthorDate: Mon Nov 17 17:16:57 2025 +0530
Fixes all race conditions from stream consumer (#17089)
* Fixes all race condition from stream consumer
* Adds test for tableDataManager cache
* Fixes bug related to fetching partition offset
* Fixes test and addresses comments
* Reduces code duplication
---
.../realtime/RealtimeSegmentDataManager.java | 68 ++++-----
.../realtime/RealtimeSegmentMetadataUtils.java | 65 +++++++++
.../manager/realtime/RealtimeTableDataManager.java | 43 ++++++
.../realtime/RealtimeTableDataManagerTest.java | 66 +++++++++
.../stream/kafka20/KafkaConsumerFactory.java | 9 ++
.../SynchronizedKafkaStreamMetadataProvider.java | 44 ++++++
.../stream/kafka30/KafkaConsumerFactory.java | 9 ++
.../SynchronizedKafkaStreamMetadataProvider.java | 44 ++++++
.../pinot/server/api/resources/DebugResource.java | 26 +++-
.../pinot/server/api/resources/TablesResource.java | 24 ++--
.../FreshnessBasedConsumptionStatusChecker.java | 26 ++--
.../IngestionBasedConsumptionStatusChecker.java | 6 +-
.../helix/OffsetBasedConsumptionStatusChecker.java | 4 +-
...FreshnessBasedConsumptionStatusCheckerTest.java | 156 ++++++++++++++++-----
.../OffsetBasedConsumptionStatusCheckerTest.java | 18 +--
.../pinot/spi/stream/StreamConsumerFactory.java | 4 +
.../pinot/spi/stream/StreamMetadataProvider.java | 3 +
17 files changed, 501 insertions(+), 114 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index e372be01e0c..b8e2ce9097e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -94,7 +94,6 @@ import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
-import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.stream.PermanentConsumerException;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -1082,21 +1081,12 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
/**
* Returns the {@link ConsumerPartitionState} for the partition group.
*/
- public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
+ public Map<String, ConsumerPartitionState> getConsumerPartitionState(
+ @Nullable StreamPartitionMsgOffset latestMsgOffset) {
String partitionGroupId = String.valueOf(_partitionGroupId);
- return Collections.singletonMap(partitionGroupId, new
ConsumerPartitionState(partitionGroupId, getCurrentOffset(),
- getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000),
_lastRowMetadata));
- }
-
- /**
- * Returns the {@link PartitionLagState} for the partition group.
- */
- public Map<String, PartitionLagState> getPartitionToLagState(
- Map<String, ConsumerPartitionState> consumerPartitionStateMap) {
- if (_partitionMetadataProvider == null) {
- createPartitionMetadataProvider("Get Partition Lag State");
- }
- return
_partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
+ return Collections.singletonMap(partitionGroupId,
+ new ConsumerPartitionState(partitionGroupId, getCurrentOffset(),
getLastConsumedTimestamp(), latestMsgOffset,
+ _lastRowMetadata));
}
/**
@@ -1119,6 +1109,18 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
return _currentOffset;
}
+ public String getTableStreamName() {
+ return _tableStreamName;
+ }
+
+ public int getStreamPartitionId() {
+ return _streamPartitionId;
+ }
+
+ public StreamConsumerFactory getStreamConsumerFactory() {
+ return _streamConsumerFactory;
+ }
+
@Nullable
public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() {
return _latestStreamOffsetAtStartupTime;
@@ -1436,6 +1438,10 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
} while (!_shouldStop);
}
+ public StreamMetadataProvider getPartitionMetadataProvider() {
+ return _partitionMetadataProvider;
+ }
+
protected SegmentCompletionProtocol.Response postSegmentConsumedMsg() {
// Post segmentConsumed to current leader.
// Retry maybe once if leader is not found.
@@ -1905,41 +1911,21 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
@Nullable
- public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs,
boolean useDebugLog) {
- return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
maxWaitTimeMs, useDebugLog);
- }
-
- @Nullable
- public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
- return fetchLatestStreamOffset(maxWaitTimeMs, false);
- }
-
- @Nullable
- public StreamPartitionMsgOffset fetchEarliestStreamOffset(long
maxWaitTimeMs, boolean useDebugLog) {
- return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
maxWaitTimeMs, useDebugLog);
+ private StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs)
{
+ return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
maxWaitTimeMs);
}
@Nullable
- public StreamPartitionMsgOffset fetchEarliestStreamOffset(long
maxWaitTimeMs) {
- return fetchEarliestStreamOffset(maxWaitTimeMs, false);
- }
-
- @Nullable
- private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria
offsetCriteria, long maxWaitTimeMs,
- boolean useDebugLog) {
+ private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria
offsetCriteria, long maxWaitTimeMs) {
if (_partitionMetadataProvider == null) {
createPartitionMetadataProvider("Fetch latest stream offset");
}
try {
return
_partitionMetadataProvider.fetchStreamPartitionOffset(offsetCriteria,
maxWaitTimeMs);
} catch (Exception e) {
- String logMessage = "Cannot fetch stream offset with criteria " +
offsetCriteria + " for clientId " + _clientId
- + " and partitionGroupId " + _partitionGroupId + " with maxWaitTime
" + maxWaitTimeMs;
- if (!useDebugLog) {
- _segmentLogger.warn(logMessage, e);
- } else {
- _segmentLogger.debug(logMessage, e);
- }
+ _segmentLogger.warn(
+ "Cannot fetch stream offset with criteria {} for clientId {} and
partitionGroupId {} with maxWaitTime {}",
+ offsetCriteria, _clientId, _partitionGroupId, maxWaitTimeMs, e);
}
return null;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentMetadataUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentMetadataUtils.java
new file mode 100644
index 00000000000..faaf183c17e
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentMetadataUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for realtime segment metadata operations.
+ */
+public class RealtimeSegmentMetadataUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeSegmentMetadataUtils.class);
+ private static final long STREAM_METADATA_FETCH_TIMEOUT_MS = 5000;
+
+ private RealtimeSegmentMetadataUtils() {
+ }
+
+ /**
+ * Fetches the latest stream offset for a segment's partition using the
provided metadata provider.
+ * This encapsulates the common pattern of getting the partition ID from the
segment and fetching
+ * the offset from the metadata provider.
+ *
+ * @param realtimeSegmentDataManager The segment data manager to get
partition ID from
+ * @param streamMetadataProvider The stream metadata provider to use for
fetching
+ * @return The latest stream offset for the partition, or null if not
available
+ * @throws RuntimeException if fetching fails
+ */
+ @Nullable
+ public static StreamPartitionMsgOffset
fetchLatestStreamOffset(RealtimeSegmentDataManager realtimeSegmentDataManager,
+ StreamMetadataProvider streamMetadataProvider) {
+ try {
+ int partitionId = realtimeSegmentDataManager.getStreamPartitionId();
+ Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap =
+
streamMetadataProvider.fetchLatestStreamOffset(Collections.singleton(partitionId),
+ STREAM_METADATA_FETCH_TIMEOUT_MS);
+ return partitionMsgOffsetMap.get(partitionId);
+ } catch (Exception e) {
+ String segmentName = realtimeSegmentDataManager.getSegmentName();
+ LOGGER.error("Failed to fetch latest stream offset for segment: {}",
segmentName, e);
+ throw new RuntimeException("Failed to fetch latest stream offset for
segment: " + segmentName, e);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 3a379c4f703..d6c8407abaf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -20,9 +20,13 @@ package org.apache.pinot.core.data.manager.realtime;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -31,6 +35,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -82,7 +87,9 @@ import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
@@ -133,6 +140,9 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
public static final long SLEEP_INTERVAL_MS = 30000; // 30 seconds sleep
interval
@Deprecated
private static final String SEGMENT_DOWNLOAD_TIMEOUT_MINUTES =
"segmentDownloadTimeoutMinutes";
+ private static final Duration STREAM_METADATA_PROVIDER_CACHE_TTL =
Duration.ofMinutes(10);
+
+ protected Cache<String, StreamMetadataProvider> _streamMetadataProviderCache;
private final BooleanSupplier _isServerReadyToServeQueries;
@@ -216,6 +226,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
_enforceConsumptionInOrder = isEnforceConsumptionInOrder();
+ _streamMetadataProviderCache = getStreamMetadataProviderCache();
// For dedup and partial-upsert, need to wait for all segments loaded
before starting consuming data
if (isDedupEnabled() || isPartialUpsertEnabled()) {
@@ -248,6 +259,23 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
}
+ @VisibleForTesting
+ protected Cache<String, StreamMetadataProvider>
getStreamMetadataProviderCache() {
+ return CacheBuilder.newBuilder()
+ .expireAfterAccess(STREAM_METADATA_PROVIDER_CACHE_TTL)
+ .removalListener((RemovalNotification<String, StreamMetadataProvider>
notification) -> {
+ StreamMetadataProvider provider = notification.getValue();
+ if (provider != null) {
+ try {
+ provider.close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close StreamMetadataProvider for key {}",
notification.getKey(), e);
+ }
+ }
+ })
+ .build();
+ }
+
@Override
protected void doStart() {
}
@@ -355,6 +383,21 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return segmentContexts;
}
+ /**
+ * Returns thread safe StreamMetadataProvider which is shared across
different callers.
+ */
+ public StreamMetadataProvider
getStreamMetadataProvider(RealtimeSegmentDataManager
realtimeSegmentDataManager) {
+ String tableStreamName = realtimeSegmentDataManager.getTableStreamName();
+ StreamConsumerFactory streamConsumerFactory =
realtimeSegmentDataManager.getStreamConsumerFactory();
+ try {
+ return _streamMetadataProviderCache.get(tableStreamName,
+ () ->
streamConsumerFactory.createStreamMetadataProvider(tableStreamName, true));
+ } catch (ExecutionException e) {
+ LOGGER.error("Failed to get stream metadata provider for tableStream:
{}", tableStreamName);
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Returns all partitionGroupIds for the partitions hosted by this server
for current table.
* @apiNote this involves Zookeeper read and should not be used frequently
due to efficiency concerns.
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index ea266df007a..1c78091d16f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -18,15 +18,27 @@
*/
package org.apache.pinot.core.data.manager.realtime;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
+import java.time.Duration;
+import java.util.concurrent.Semaphore;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
+import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -60,4 +72,58 @@ public class RealtimeTableDataManagerTest {
assertEquals(timeFieldSpec.getDefaultNullValue(),
Integer.parseInt(DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(currentTimeMs)));
}
+
+ @Test
+ public void testStreamMetadataProviderCache() {
+ class FakeRealtimeTableDataManager extends RealtimeTableDataManager {
+ private boolean _cacheEntryRemovalNotified;
+ public FakeRealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
+ super(segmentBuildSemaphore);
+ _streamMetadataProviderCache = getStreamMetadataProviderCache();
+ }
+ public void updateCache() {
+ _streamMetadataProviderCache = CacheBuilder.newBuilder()
+ .expireAfterAccess(Duration.ofMillis(1))
+ .removalListener((RemovalNotification<String,
StreamMetadataProvider> notification) -> {
+ StreamMetadataProvider provider = notification.getValue();
+ if (provider != null) {
+ try {
+ provider.close();
+ _cacheEntryRemovalNotified = true;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close StreamMetadataProvider for key
{}", notification.getKey(), e);
+ }
+ }
+ })
+ .build();
+ }
+ public Cache<String, StreamMetadataProvider> getCache() {
+ return _streamMetadataProviderCache;
+ }
+ }
+ FakeRealtimeTableDataManager fakeRealtimeTableDataManager = new
FakeRealtimeTableDataManager(null);
+
+ RealtimeSegmentDataManager mockRealtimeSegmentDataManager =
Mockito.mock(RealtimeSegmentDataManager.class);
+
when(mockRealtimeSegmentDataManager.getTableStreamName()).thenReturn("testTable-testTopic");
+ StreamConfig streamConfig =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
+ when(mockRealtimeSegmentDataManager.getStreamConsumerFactory()).thenReturn(
+ StreamConsumerFactoryProvider.create(streamConfig));
+
+ StreamMetadataProvider streamMetadataProvider =
+
fakeRealtimeTableDataManager.getStreamMetadataProvider(mockRealtimeSegmentDataManager);
+
Assert.assertEquals(fakeRealtimeTableDataManager.getStreamMetadataProvider(mockRealtimeSegmentDataManager),
+ streamMetadataProvider);
+
+ fakeRealtimeTableDataManager.updateCache();
+ Assert.assertEquals(fakeRealtimeTableDataManager.getCache().size(), 0);
+
+ StreamMetadataProvider streamMetadataProvider1 =
+
fakeRealtimeTableDataManager.getStreamMetadataProvider(mockRealtimeSegmentDataManager);
+ Assert.assertNotEquals(streamMetadataProvider, streamMetadataProvider1);
+
+ TestUtils.waitForCondition(
+ aVoid ->
!(fakeRealtimeTableDataManager.getStreamMetadataProvider(mockRealtimeSegmentDataManager)
+ .equals(streamMetadataProvider1)), 5, 2000,
"streamMetadataProvider returned from cache must be new.");
+ Assert.assertTrue(fakeRealtimeTableDataManager._cacheEntryRemovalNotified);
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index 17d46c041be..849103c0b17 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -45,6 +45,15 @@ public class KafkaConsumerFactory extends
StreamConsumerFactory {
return new KafkaStreamMetadataProvider(clientId, _streamConfig);
}
+ @Override
+ public StreamMetadataProvider createStreamMetadataProvider(String clientId,
boolean concurrentAccessExpected) {
+ if (concurrentAccessExpected) {
+ return new SynchronizedKafkaStreamMetadataProvider(clientId,
_streamConfig);
+ } else {
+ return createStreamMetadataProvider(clientId);
+ }
+ }
+
@Override
public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/SynchronizedKafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/SynchronizedKafkaStreamMetadataProvider.java
new file mode 100644
index 00000000000..5549dd734fc
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/SynchronizedKafkaStreamMetadataProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+/**
+ * A thread-safe variant of {@link KafkaStreamMetadataProvider} that
synchronizes
+ * access to metadata fetch operations.
+ * This is particularly useful when a shared instance of {@link
KafkaStreamMetadataProvider}
+ * is accessed concurrently by multiple threads.
+ */
+public class SynchronizedKafkaStreamMetadataProvider extends
KafkaStreamMetadataProvider {
+
+ public SynchronizedKafkaStreamMetadataProvider(String clientId, StreamConfig
streamConfig) {
+ super(clientId, streamConfig);
+ }
+
+ @Override
+ public Map<Integer, StreamPartitionMsgOffset>
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+ synchronized (this) {
+ return super.fetchLatestStreamOffset(partitionIds, timeoutMillis);
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaConsumerFactory.java
index d9ee7067dbc..8847efc68b5 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaConsumerFactory.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaConsumerFactory.java
@@ -45,6 +45,15 @@ public class KafkaConsumerFactory extends
StreamConsumerFactory {
return new KafkaStreamMetadataProvider(clientId, _streamConfig);
}
+ @Override
+ public StreamMetadataProvider createStreamMetadataProvider(String clientId,
boolean concurrentAccessExpected) {
+ if (concurrentAccessExpected) {
+ return new SynchronizedKafkaStreamMetadataProvider(clientId,
_streamConfig);
+ } else {
+ return createStreamMetadataProvider(clientId);
+ }
+ }
+
@Override
public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/SynchronizedKafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/SynchronizedKafkaStreamMetadataProvider.java
new file mode 100644
index 00000000000..ea7f5897537
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/SynchronizedKafkaStreamMetadataProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka30;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+/**
+ * A thread-safe variant of {@link KafkaStreamMetadataProvider} that
synchronizes
+ * access to metadata fetch operations.
+ * This is particularly useful when a shared instance of {@link
KafkaStreamMetadataProvider}
+ * is accessed concurrently by multiple threads.
+ */
+public class SynchronizedKafkaStreamMetadataProvider extends
KafkaStreamMetadataProvider {
+
+ public SynchronizedKafkaStreamMetadataProvider(String clientId, StreamConfig
streamConfig) {
+ super(clientId, streamConfig);
+ }
+
+ @Override
+ public Map<Integer, StreamPartitionMsgOffset>
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+ synchronized (this) {
+ return super.fetchLatestStreamOffset(partitionIds, timeoutMillis);
+ }
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
index dffe1cb347b..9ea7529a4ad 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
@@ -54,6 +54,8 @@ import
org.apache.pinot.common.restlet.resources.SegmentServerDebugInfo;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentMetadataUtils;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.ImmutableSegment;
@@ -64,6 +66,9 @@ import org.apache.pinot.spi.accounting.ThreadResourceTracker;
import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,7 +129,7 @@ public class DebugResource {
Map<String, SegmentErrorInfo> segmentErrorsMap =
tableDataManager.getSegmentErrors();
SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
try {
- SegmentConsumerInfo segmentConsumerInfo =
getSegmentConsumerInfo(segmentDataManager, tableType);
+ SegmentConsumerInfo segmentConsumerInfo =
getSegmentConsumerInfo(tableDataManager, segmentDataManager, tableType);
long segmentSize = getSegmentSize(segmentDataManager);
SegmentErrorInfo segmentErrorInfo = segmentErrorsMap.get(segmentName);
return new SegmentServerDebugInfo(segmentName,
FileUtils.byteCountToDisplaySize(segmentSize), segmentConsumerInfo,
@@ -172,7 +177,8 @@ public class DebugResource {
segmentsWithDataManagers.add(segmentName);
// Get segment consumer info.
- SegmentConsumerInfo segmentConsumerInfo =
getSegmentConsumerInfo(segmentDataManager, tableType);
+ SegmentConsumerInfo segmentConsumerInfo =
+ getSegmentConsumerInfo(tableDataManager, segmentDataManager,
tableType);
// Get segment size.
long segmentSize = getSegmentSize(segmentDataManager);
@@ -209,17 +215,25 @@ public class DebugResource {
.getSegment()).getSegmentSizeBytes() : 0;
}
- private SegmentConsumerInfo getSegmentConsumerInfo(SegmentDataManager
segmentDataManager, TableType tableType) {
+ private SegmentConsumerInfo getSegmentConsumerInfo(TableDataManager
tableDataManager,
+ SegmentDataManager segmentDataManager, TableType tableType) {
SegmentConsumerInfo segmentConsumerInfo = null;
if (tableType == TableType.REALTIME) {
RealtimeSegmentDataManager realtimeSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- Map<String, ConsumerPartitionState> partitionStateMap =
realtimeSegmentDataManager.getConsumerPartitionState();
+ StreamMetadataProvider streamMetadataProvider =
+ ((RealtimeTableDataManager)
(tableDataManager)).getStreamMetadataProvider(realtimeSegmentDataManager);
+ StreamPartitionMsgOffset latestMsgOffset =
+
RealtimeSegmentMetadataUtils.fetchLatestStreamOffset(realtimeSegmentDataManager,
streamMetadataProvider);
+ Map<String, ConsumerPartitionState> partitionIdToStateMap =
+
realtimeSegmentDataManager.getConsumerPartitionState(latestMsgOffset);
Map<String, String> currentOffsets =
realtimeSegmentDataManager.getPartitionToCurrentOffset();
- Map<String, String> upstreamLatest =
partitionStateMap.entrySet().stream().collect(
+ Map<String, String> upstreamLatest =
partitionIdToStateMap.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().getUpstreamLatestOffset().toString()));
Map<String, String> recordsLagMap = new HashMap<>();
Map<String, String> availabilityLagMsMap = new HashMap<>();
-
realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap).forEach((k,
v) -> {
+ Map<String, PartitionLagState> partitionToLagState =
+
streamMetadataProvider.getCurrentPartitionLagState(partitionIdToStateMap);
+ partitionToLagState.forEach((k, v) -> {
recordsLagMap.put(k, v.getRecordsLag());
availabilityLagMsMap.put(k, v.getAvailabilityLagMs());
});
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index b482d662a5e..4ad3a3810ac 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -35,7 +35,6 @@ import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
-import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -88,6 +87,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentMetadataUtils;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
@@ -108,6 +108,9 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -1101,11 +1104,17 @@ public class TablesResource {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
RealtimeSegmentDataManager realtimeSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- Map<String, ConsumerPartitionState> partitionStateMap =
- realtimeSegmentDataManager.getConsumerPartitionState();
+ StreamMetadataProvider streamMetadataProvider =
+ ((RealtimeTableDataManager)
(tableDataManager)).getStreamMetadataProvider(realtimeSegmentDataManager);
+ StreamPartitionMsgOffset latestMsgOffset =
+
RealtimeSegmentMetadataUtils.fetchLatestStreamOffset(realtimeSegmentDataManager,
streamMetadataProvider);
+ Map<String, ConsumerPartitionState> partitionIdToStateMap =
+
realtimeSegmentDataManager.getConsumerPartitionState(latestMsgOffset);
Map<String, String> recordsLagMap = new HashMap<>();
Map<String, String> availabilityLagMsMap = new HashMap<>();
-
realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap).forEach((k,
v) -> {
+ Map<String, PartitionLagState> partitionToLagState =
+
streamMetadataProvider.getCurrentPartitionLagState(partitionIdToStateMap);
+ partitionToLagState.forEach((k, v) -> {
recordsLagMap.put(k, v.getRecordsLag());
availabilityLagMsMap.put(k, v.getAvailabilityLagMs());
});
@@ -1114,15 +1123,12 @@ public class TablesResource {
segmentConsumerInfoList.add(new
SegmentConsumerInfo(segmentDataManager.getSegmentName(),
realtimeSegmentDataManager.getConsumerState().toString(),
realtimeSegmentDataManager.getLastConsumedTimestamp(),
partitiionToOffsetMap,
- new
SegmentConsumerInfo.PartitionOffsetInfo(partitiionToOffsetMap,
partitionStateMap.entrySet().stream()
+ new
SegmentConsumerInfo.PartitionOffsetInfo(partitiionToOffsetMap,
partitionIdToStateMap.entrySet()
+ .stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().getUpstreamLatestOffset().toString())),
recordsLagMap, availabilityLagMsMap)));
}
}
- } catch (ConcurrentModificationException e) {
- LOGGER.warn("Multi-threaded access is unsafe for KafkaConsumer, caught
exception when fetching stream offset",
- e);
- return segmentConsumerInfoList;
} catch (Exception e) {
throw new WebApplicationException("Caught exception when getting
consumer info for table: " + realtimeTableName);
} finally {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 01f429a5117..13dd8075841 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -24,6 +24,9 @@ import java.util.Set;
import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentMetadataUtils;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -56,7 +59,8 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
}
@Override
- protected boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager) {
+ protected boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager,
+ RealtimeTableDataManager realtimeTableDataManager) {
long now = now();
long latestIngestionTimestamp =
rtSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp();
@@ -73,7 +77,12 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
// message is too old to pass the freshness check. We check this condition
separately to avoid hitting
// the stream consumer to check partition count if we're already caught up.
StreamPartitionMsgOffset currentOffset =
rtSegmentDataManager.getCurrentOffset();
- StreamPartitionMsgOffset latestStreamOffset =
rtSegmentDataManager.fetchLatestStreamOffset(5000);
+
+ StreamMetadataProvider streamMetadataProvider =
+
realtimeTableDataManager.getStreamMetadataProvider(rtSegmentDataManager);
+ StreamPartitionMsgOffset latestStreamOffset =
+
RealtimeSegmentMetadataUtils.fetchLatestStreamOffset(rtSegmentDataManager,
streamMetadataProvider);
+
if (isOffsetCaughtUp(segmentName, currentOffset, latestStreamOffset)) {
_logger.info("Segment {} with freshness {}ms has not caught up within
min freshness {}. "
+ "But the current ingested offset is equal to the latest
available offset {}.", segmentName, freshnessMs,
@@ -81,21 +90,18 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
return true;
}
- StreamPartitionMsgOffset earliestStreamOffset =
rtSegmentDataManager.fetchEarliestStreamOffset(5000);
-
long idleTimeMs = rtSegmentDataManager.getTimeSinceEventLastConsumedMs();
if (segmentHasBeenIdleLongerThanThreshold(idleTimeMs)) {
_logger.warn("Segment {} with freshness {}ms has not caught up within
min freshness {}. "
- + "But the current ingested offset {} has been idle for {}ms. At
offset {}. Earliest offset {}. "
- + "Latest offset {}.", segmentName, freshnessMs,
_minFreshnessMs, currentOffset, idleTimeMs,
- currentOffset,
- earliestStreamOffset, latestStreamOffset);
+ + "But the current ingested offset {} has been idle for {}ms. At
offset {}. " + "Latest offset {}.",
+ segmentName, freshnessMs, _minFreshnessMs, currentOffset,
idleTimeMs, currentOffset, latestStreamOffset);
return true;
}
_logger.info("Segment {} with freshness {}ms has not caught up within "
- + "min freshness {}. At offset {}. Earliest offset {}. Latest
offset {}.", segmentName, freshnessMs,
- _minFreshnessMs, currentOffset, earliestStreamOffset,
latestStreamOffset);
+ + "min freshness {}. At offset {}. Latest offset {}.",
segmentName, freshnessMs, _minFreshnessMs,
+ currentOffset,
+ latestStreamOffset);
return false;
}
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index df8081f136a..56ed05920e3 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -93,7 +94,7 @@ public abstract class IngestionBasedConsumptionStatusChecker {
continue;
}
RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+ if (isSegmentCaughtUp(segName, rtSegmentDataManager,
(RealtimeTableDataManager) tableDataManager)) {
caughtUpSegments.add(segName);
}
} finally {
@@ -135,7 +136,8 @@ public abstract class
IngestionBasedConsumptionStatusChecker {
return numLaggingSegments;
}
- protected abstract boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager);
+ protected abstract boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager,
+ RealtimeTableDataManager realtimeTableDataManager);
protected boolean isOffsetCaughtUp(String segmentName,
StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset
latestOffset) {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
index b4f2ba12e25..e38f4fbbe5f 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -42,7 +43,8 @@ public class OffsetBasedConsumptionStatusChecker extends
IngestionBasedConsumpti
}
@Override
- protected boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager) {
+ protected boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager,
+ RealtimeTableDataManager realtimeTableDataManager) {
StreamPartitionMsgOffset latestIngestedOffset =
rtSegmentDataManager.getCurrentOffset();
StreamPartitionMsgOffset latestStreamOffset =
rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index 6b10fd1b36a..40b3a40e2e9 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -28,12 +28,15 @@ import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -75,8 +78,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
- TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+ RealtimeTableDataManager tableDataManagerB =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
@@ -104,9 +107,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrB0.getSegment()).thenReturn(mockSegment);
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1500));
- when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
- when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(200));
- when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(2000));
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+ when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+ when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+ when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+ when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
+ when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// current offset latest stream offset current
time last ingestion time
@@ -128,8 +144,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
}
@Test
- public void testWithDroppedTableAndSegment()
- throws InterruptedException {
+ public void testWithDroppedTableAndSegment() {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
@@ -146,7 +161,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(null);
@@ -155,7 +170,12 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(null);
- when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+ when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+
+ when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
// ensure negative values are ignored
setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
@@ -202,8 +222,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
- TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+ RealtimeTableDataManager tableDataManagerB =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
@@ -215,9 +235,21 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
- when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
- when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(200));
- when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(2000));
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+ when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+ when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+ when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+ when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
+ when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -268,8 +300,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
- TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+ RealtimeTableDataManager tableDataManagerB =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
@@ -281,9 +313,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
- when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
- when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
- when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+ when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+ when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+ when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+ when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
+ when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
@@ -344,8 +389,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
- TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+ RealtimeTableDataManager tableDataManagerB =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
@@ -357,9 +402,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
- when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
- when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
- when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+ when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+ when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+ when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+ when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
+ when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
@@ -396,8 +454,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
- TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+ RealtimeTableDataManager tableDataManagerB =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
@@ -409,9 +467,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
- when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
- when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(200));
- when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(2000));
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+ when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+ when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+ when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+ when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
+ when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
+
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -459,8 +530,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
- TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+ RealtimeTableDataManager tableDataManagerB =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
@@ -472,9 +543,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
- when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
- when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(200));
- when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(null);
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+ when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+ when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+ when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+ when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
+ when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of());
+
when(segMngrA0.getCurrentOffset()).thenReturn(null);
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -495,7 +579,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
// segB0 0 0 100
0
setupLatestIngestionTimestamp(segMngrA0, 89L);
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
- when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(0));
+ when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(0)));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
}
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
index 2248f731d2d..e63b10fafa0 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
@@ -26,7 +26,7 @@ import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.testng.annotations.Test;
@@ -52,8 +52,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
- TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+ RealtimeTableDataManager tableDataManagerB =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
@@ -106,8 +106,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
- TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+ RealtimeTableDataManager tableDataManagerB =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
@@ -166,8 +166,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
- TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+ RealtimeTableDataManager tableDataManagerB =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
@@ -221,8 +221,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
// setup TableDataMangers
- TableDataManager tableDataManagerA = mock(TableDataManager.class);
- TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+ RealtimeTableDataManager tableDataManagerB =
mock(RealtimeTableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 630f69e3cf0..6502ea1a474 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -54,6 +54,10 @@ public abstract class StreamConsumerFactory {
*/
public abstract StreamMetadataProvider createStreamMetadataProvider(String
clientId);
+ public StreamMetadataProvider createStreamMetadataProvider(String clientId,
boolean concurrentAccessExpected) {
+ return createStreamMetadataProvider(clientId);
+ }
+
public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() {
return new LongMsgOffsetFactory();
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 0ef42b8dde1..4b5666a7a35 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -55,6 +55,9 @@ public interface StreamMetadataProvider extends Closeable {
/**
* Fetches the latest offset for a set of given partition Ids.
+ * Implementations may not be thread-safe. Callers that need concurrent
access
+ * should use a thread-safe wrapper.
+ *
* @param partitionIds partition Ids of the stream
* @param timeoutMillis fetch timeout
* @return latest {@link StreamPartitionMsgOffset} for each partition Id.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]