This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 44b803ae8e Rename LLRealtimeSegmentDataManager to RealtimeSegmentDataManager (#11687) 44b803ae8e is described below commit 44b803ae8e3eee5cc191793b1ab4c35a682c6151 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Sep 26 21:08:21 2023 -0700 Rename LLRealtimeSegmentDataManager to RealtimeSegmentDataManager (#11687) --- ...anager.java => RealtimeSegmentDataManager.java} | 6 +- .../manager/realtime/RealtimeTableDataManager.java | 8 +- .../data/manager/realtime/SegmentCommitter.java | 2 +- .../manager/realtime/SplitSegmentCommitter.java | 2 +- ...st.java => RealtimeSegmentDataManagerTest.java} | 170 ++++++++++----------- .../pinot/server/api/resources/DebugResource.java | 4 +- .../pinot/server/api/resources/TablesResource.java | 8 +- .../FreshnessBasedConsumptionStatusChecker.java | 4 +- .../starter/helix/HelixInstanceDataManager.java | 12 +- .../IngestionBasedConsumptionStatusChecker.java | 8 +- .../helix/OffsetBasedConsumptionStatusChecker.java | 4 +- .../SegmentOnlineOfflineStateModelFactory.java | 6 +- ...FreshnessBasedConsumptionStatusCheckerTest.java | 40 ++--- .../OffsetBasedConsumptionStatusCheckerTest.java | 26 ++-- 14 files changed, 150 insertions(+), 150 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java similarity index 99% rename from pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java rename to pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 97c1fbfd82..e772eae7e8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -107,7 +107,7 @@ import org.slf4j.LoggerFactory; /** * Segment data manager for low level consumer realtime segments, which manages consumption and segment completion. */ -public class LLRealtimeSegmentDataManager extends SegmentDataManager { +public class RealtimeSegmentDataManager extends SegmentDataManager { @VisibleForTesting public enum State { @@ -1314,7 +1314,7 @@ public class LLRealtimeSegmentDataManager extends SegmentDataManager { // Assume that this is called only on OFFLINE to CONSUMING transition. // If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here. - public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, + public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager, @@ -1357,7 +1357,7 @@ public class LLRealtimeSegmentDataManager extends SegmentDataManager { _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore; _acquiredConsumerSemaphore = new AtomicBoolean(false); _clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId; - _segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); + _segmentLogger = LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); _tableStreamName = _tableNameWithType + "_" + streamTopic; if (_indexLoadingConfig.isRealtimeOffHeapAllocation() && !_indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) { _memoryManager = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index e4b34d244d..88ca87ad62 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -436,12 +436,12 @@ public class RealtimeTableDataManager extends BaseTableDataManager { PartitionDedupMetadataManager partitionDedupMetadataManager = _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null; - LLRealtimeSegmentDataManager llRealtimeSegmentDataManager = - new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(), + RealtimeSegmentDataManager realtimeSegmentDataManager = + new RealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager, partitionDedupMetadataManager, _isTableReadyToConsumeData); - llRealtimeSegmentDataManager.startConsumption(); - segmentDataManager = llRealtimeSegmentDataManager; + realtimeSegmentDataManager.startConsumption(); + segmentDataManager = realtimeSegmentDataManager; _logger.info("Initialized RealtimeSegmentDataManager - " + segmentName); registerSegment(segmentName, segmentDataManager); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java index 922c54bd55..b9c1270e39 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java @@ -30,5 +30,5 @@ public interface SegmentCommitter { * @param segmentBuildDescriptor object that describes segment to be committed * @return */ - SegmentCompletionProtocol.Response commit(LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor); + SegmentCompletionProtocol.Response commit(RealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java index 7f3d3ea1fb..1e4ebfe1f8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java @@ -63,7 +63,7 @@ public class SplitSegmentCommitter implements SegmentCommitter { @Override public SegmentCompletionProtocol.Response commit( - LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor) { + RealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor) { File segmentTarFile = segmentBuildDescriptor.getSegmentTarFile(); SegmentCompletionProtocol.Response segmentCommitStartResponse = _protocolHandler.segmentCommitStart(_params); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java similarity index 87% rename from pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java rename to pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index b36059def7..1574892f42 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -77,8 +77,8 @@ import static org.mockito.Mockito.when; // TODO Re-write this test using the stream abstraction -public class LLRealtimeSegmentDataManagerTest { - private static final String SEGMENT_DIR = "/tmp/" + LLRealtimeSegmentDataManagerTest.class.getSimpleName(); +public class RealtimeSegmentDataManagerTest { + private static final String SEGMENT_DIR = "/tmp/" + RealtimeSegmentDataManagerTest.class.getSimpleName(); private static final File SEGMENT_DIR_FILE = new File(SEGMENT_DIR); private static final String RAW_TABLE_NAME = "testTable"; private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); @@ -120,12 +120,12 @@ public class LLRealtimeSegmentDataManagerTest { return segmentZKMetadata; } - private FakeLLRealtimeSegmentDataManager createFakeSegmentManager() + private FakeRealtimeSegmentDataManager createFakeSegmentManager() throws Exception { return createFakeSegmentManager(false, new TimeSupplier(), null, null, null); } - private FakeLLRealtimeSegmentDataManager createFakeSegmentManager(boolean noUpsert, TimeSupplier timeSupplier, + private FakeRealtimeSegmentDataManager createFakeSegmentManager(boolean noUpsert, TimeSupplier timeSupplier, @Nullable String maxRows, @Nullable String maxDuration, @Nullable TableConfig tableConfig) throws Exception { SegmentZKMetadata segmentZKMetadata = createZkMetadata(); @@ -148,7 +148,7 @@ public class LLRealtimeSegmentDataManagerTest { _partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new Semaphore(1)); Schema schema = Fixtures.createSchema(); ServerMetrics serverMetrics = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); - return new FakeLLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, tableDataManager, SEGMENT_DIR, schema, + return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, tableDataManager, SEGMENT_DIR, schema, llcSegmentName, _partitionGroupIdToSemaphoreMap, serverMetrics, timeSupplier); } @@ -169,8 +169,8 @@ public class LLRealtimeSegmentDataManagerTest { @Test public void testHolding() throws Exception { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE + 500); // We should consume initially... segmentDataManager._consumeOffsets.add(endOffset); @@ -191,7 +191,7 @@ public class LLRealtimeSegmentDataManagerTest { Assert.assertFalse(segmentDataManager._buildSegmentCalled); Assert.assertFalse(segmentDataManager._commitSegmentCalled); Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); - Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), LLRealtimeSegmentDataManager.State.HOLDING); + Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), RealtimeSegmentDataManager.State.HOLDING); segmentDataManager.destroy(); } @@ -199,8 +199,8 @@ public class LLRealtimeSegmentDataManagerTest { @Test public void testCommitAfterHold() throws Exception { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE + 500); // We should consume initially... segmentDataManager._consumeOffsets.add(endOffset); @@ -223,15 +223,15 @@ public class LLRealtimeSegmentDataManagerTest { Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); Assert.assertTrue(segmentDataManager._commitSegmentCalled); Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), - LLRealtimeSegmentDataManager.State.COMMITTED); + RealtimeSegmentDataManager.State.COMMITTED); segmentDataManager.destroy(); } @Test public void testSegmentBuildException() throws Exception { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE + 500); // We should consume initially... segmentDataManager._consumeOffsets.add(endOffset); @@ -243,7 +243,7 @@ public class LLRealtimeSegmentDataManagerTest { consumer.run(); Assert.assertTrue(segmentDataManager._buildSegmentCalled); - Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), LLRealtimeSegmentDataManager.State.ERROR); + Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), RealtimeSegmentDataManager.State.ERROR); segmentDataManager.destroy(); } @@ -251,8 +251,8 @@ public class LLRealtimeSegmentDataManagerTest { @Test public void testCommitAfterCatchup() throws Exception { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset firstOffset = new LongMsgOffset(START_OFFSET_VALUE + 500); final LongMsgOffset catchupOffset = new LongMsgOffset(firstOffset.getOffset() + 10); // We should consume initially... @@ -287,7 +287,7 @@ public class LLRealtimeSegmentDataManagerTest { Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); Assert.assertTrue(segmentDataManager._commitSegmentCalled); Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), - LLRealtimeSegmentDataManager.State.COMMITTED); + RealtimeSegmentDataManager.State.COMMITTED); segmentDataManager.destroy(); } @@ -297,9 +297,9 @@ public class LLRealtimeSegmentDataManagerTest { tableConfig.getIndexingConfig().getStreamConfigs() .put(StreamConfigProperties.constructStreamProperty( StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA, "fakeStream"), "2d"); - FakeLLRealtimeSegmentDataManager segmentDataManager = + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(false, new TimeSupplier(), null, null, tableConfig); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset firstOffset = new LongMsgOffset(START_OFFSET_VALUE + 500); final LongMsgOffset catchupOffset = new LongMsgOffset(firstOffset.getOffset() + 10); // We should consume initially... @@ -334,7 +334,7 @@ public class LLRealtimeSegmentDataManagerTest { Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); Assert.assertTrue(segmentDataManager._commitSegmentCalled); Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), - LLRealtimeSegmentDataManager.State.COMMITTED); + RealtimeSegmentDataManager.State.COMMITTED); segmentDataManager.destroy(); } @@ -344,9 +344,9 @@ public class LLRealtimeSegmentDataManagerTest { tableConfig.getIndexingConfig().getStreamConfigs() .put(StreamConfigProperties.constructStreamProperty( StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA, "fakeStream"), Instant.now().toString()); - FakeLLRealtimeSegmentDataManager segmentDataManager = + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(false, new TimeSupplier(), null, null, tableConfig); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset firstOffset = new LongMsgOffset(START_OFFSET_VALUE + 500); final LongMsgOffset catchupOffset = new LongMsgOffset(firstOffset.getOffset() + 10); // We should consume initially... @@ -381,15 +381,15 @@ public class LLRealtimeSegmentDataManagerTest { Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); Assert.assertTrue(segmentDataManager._commitSegmentCalled); Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), - LLRealtimeSegmentDataManager.State.COMMITTED); + RealtimeSegmentDataManager.State.COMMITTED); segmentDataManager.destroy(); } @Test public void testDiscarded() throws Exception { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE + 500); segmentDataManager._consumeOffsets.add(endOffset); final SegmentCompletionProtocol.Response discardResponse = new SegmentCompletionProtocol.Response( @@ -406,15 +406,15 @@ public class LLRealtimeSegmentDataManagerTest { Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); Assert.assertFalse(segmentDataManager._commitSegmentCalled); Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), - LLRealtimeSegmentDataManager.State.DISCARDED); + RealtimeSegmentDataManager.State.DISCARDED); segmentDataManager.destroy(); } @Test public void testRetained() throws Exception { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE + 500); segmentDataManager._consumeOffsets.add(endOffset); SegmentCompletionProtocol.Response.Params params = new SegmentCompletionProtocol.Response.Params(); @@ -431,15 +431,15 @@ public class LLRealtimeSegmentDataManagerTest { Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); Assert.assertTrue(segmentDataManager._buildAndReplaceCalled); Assert.assertFalse(segmentDataManager._commitSegmentCalled); - Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), LLRealtimeSegmentDataManager.State.RETAINED); + Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), RealtimeSegmentDataManager.State.RETAINED); segmentDataManager.destroy(); } @Test public void testNotLeader() throws Exception { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE + 500); // We should consume initially... segmentDataManager._consumeOffsets.add(endOffset); @@ -459,15 +459,15 @@ public class LLRealtimeSegmentDataManagerTest { Assert.assertFalse(segmentDataManager._buildSegmentCalled); Assert.assertFalse(segmentDataManager._commitSegmentCalled); Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); - Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), LLRealtimeSegmentDataManager.State.HOLDING); + Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), RealtimeSegmentDataManager.State.HOLDING); segmentDataManager.destroy(); } @Test public void testConsumingException() throws Exception { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); segmentDataManager._throwExceptionFromConsume = true; segmentDataManager._postConsumeStoppedCalled = false; @@ -489,9 +489,9 @@ public class LLRealtimeSegmentDataManagerTest { metadata.setEndOffset(finalOffset.toString()); { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.COMMITTED); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.COMMITTED); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); Assert.assertFalse(segmentDataManager._buildAndReplaceCalled); @@ -499,9 +499,9 @@ public class LLRealtimeSegmentDataManagerTest { } { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.RETAINED); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.RETAINED); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); Assert.assertFalse(segmentDataManager._buildAndReplaceCalled); @@ -509,9 +509,9 @@ public class LLRealtimeSegmentDataManagerTest { } { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.DISCARDED); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.DISCARDED); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled); Assert.assertFalse(segmentDataManager._buildAndReplaceCalled); @@ -519,9 +519,9 @@ public class LLRealtimeSegmentDataManagerTest { } { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.ERROR); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.ERROR); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled); Assert.assertFalse(segmentDataManager._buildAndReplaceCalled); @@ -530,9 +530,9 @@ public class LLRealtimeSegmentDataManagerTest { // If holding, but we have overshot the expected final offset, the download and replace { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.HOLDING); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.HOLDING); segmentDataManager.setCurrentOffset(finalOffsetValue + 1); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled); @@ -542,9 +542,9 @@ public class LLRealtimeSegmentDataManagerTest { // If catching up, but we have overshot the expected final offset, the download and replace { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.CATCHING_UP); segmentDataManager.setCurrentOffset(finalOffsetValue + 1); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled); @@ -554,9 +554,9 @@ public class LLRealtimeSegmentDataManagerTest { // If catching up, but we did not get to the final offset, then download and replace { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.CATCHING_UP); segmentDataManager._consumeOffsets.add(new LongMsgOffset(finalOffsetValue - 1)); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled); @@ -566,9 +566,9 @@ public class LLRealtimeSegmentDataManagerTest { // But then if we get to the exact offset, we get to build and replace, not download { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.CATCHING_UP); segmentDataManager._consumeOffsets.add(finalOffset); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled); @@ -582,8 +582,8 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { // test reaching max row limit { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.INITIAL_CONSUMING); Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached()); segmentDataManager.setNumRowsIndexed(Fixtures.MAX_ROWS_IN_SEGMENT - 1); Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached()); @@ -594,8 +594,8 @@ public class LLRealtimeSegmentDataManagerTest { } // test reaching max time limit { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.INITIAL_CONSUMING); Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached()); // We should still get false because there is no messages fetched segmentDataManager._timeSupplier.add(Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS + 1); @@ -609,8 +609,8 @@ public class LLRealtimeSegmentDataManagerTest { } // In catching up state, test reaching final offset { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.CATCHING_UP); final long finalOffset = START_OFFSET_VALUE + 100; segmentDataManager.setFinalOffset(finalOffset); segmentDataManager.setCurrentOffset(finalOffset - 1); @@ -621,9 +621,9 @@ public class LLRealtimeSegmentDataManagerTest { } // In catching up state, test reaching final offset ignoring time { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._timeSupplier.add(Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS); - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.CATCHING_UP); final long finalOffset = START_OFFSET_VALUE + 100; segmentDataManager.setFinalOffset(finalOffset); segmentDataManager.setCurrentOffset(finalOffset - 1); @@ -635,9 +635,9 @@ public class LLRealtimeSegmentDataManagerTest { // When we go from consuming to online state, time and final offset matter. // Case 1. We have reached final offset. { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._timeSupplier.add(1); - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE); segmentDataManager.setConsumeEndTime(segmentDataManager._timeSupplier.get() + 10); final long finalOffset = START_OFFSET_VALUE + 100; segmentDataManager.setFinalOffset(finalOffset); @@ -649,8 +649,8 @@ public class LLRealtimeSegmentDataManagerTest { } // Case 2. We have reached time limit. { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE); final long endTime = segmentDataManager._timeSupplier.get() + 10; segmentDataManager.setConsumeEndTime(endTime); final long finalOffset = START_OFFSET_VALUE + 100; @@ -664,9 +664,9 @@ public class LLRealtimeSegmentDataManagerTest { } } - private void setHasMessagesFetched(FakeLLRealtimeSegmentDataManager segmentDataManager, boolean hasMessagesFetched) + private void setHasMessagesFetched(FakeRealtimeSegmentDataManager segmentDataManager, boolean hasMessagesFetched) throws Exception { - Field field = LLRealtimeSegmentDataManager.class.getDeclaredField("_hasMessagesFetched"); + Field field = RealtimeSegmentDataManager.class.getDeclaredField("_hasMessagesFetched"); field.setAccessible(true); field.set(segmentDataManager, hasMessagesFetched); } @@ -675,7 +675,7 @@ public class LLRealtimeSegmentDataManagerTest { @Test public void testReuseOfBuiltSegment() throws Exception { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); SegmentCompletionProtocol.Response.Params params = new SegmentCompletionProtocol.Response.Params(); params.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); @@ -712,7 +712,7 @@ public class LLRealtimeSegmentDataManagerTest { @Test public void testFileRemovedDuringOnlineTransition() throws Exception { - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); SegmentCompletionProtocol.Response.Params params = new SegmentCompletionProtocol.Response.Params(); params.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.FAILED); @@ -735,7 +735,7 @@ public class LLRealtimeSegmentDataManagerTest { SegmentZKMetadata metadata = new SegmentZKMetadata(SEGMENT_NAME_STR); metadata.setEndOffset(new LongMsgOffset(finalOffset).toString()); segmentDataManager._stopWaitTimeMs = 0; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.HOLDING); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.HOLDING); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertFalse(segmentTarFile.exists()); segmentDataManager.destroy(); @@ -745,13 +745,13 @@ public class LLRealtimeSegmentDataManagerTest { public void testOnlyOneSegmentHoldingTheSemaphoreForParticularPartition() throws Exception { long timeout = 10_000L; - FakeLLRealtimeSegmentDataManager firstSegmentDataManager = createFakeSegmentManager(); + FakeRealtimeSegmentDataManager firstSegmentDataManager = createFakeSegmentManager(); Assert.assertTrue(firstSegmentDataManager.getAcquiredConsumerSemaphore().get()); Semaphore firstSemaphore = firstSegmentDataManager.getPartitionGroupConsumerSemaphore(); Assert.assertEquals(firstSemaphore.availablePermits(), 0); Assert.assertFalse(firstSemaphore.hasQueuedThreads()); - AtomicReference<FakeLLRealtimeSegmentDataManager> secondSegmentDataManager = new AtomicReference<>(null); + AtomicReference<FakeRealtimeSegmentDataManager> secondSegmentDataManager = new AtomicReference<>(null); // Construct the second segment manager, which will be blocked on the semaphore. Thread constructSecondSegmentManager = new Thread(() -> { @@ -837,13 +837,13 @@ public class LLRealtimeSegmentDataManagerTest { return now + TimeUnit.MINUTES.toMillis(segmentTimeThresholdMins + 1); } }; - FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(true, timeSupplier, + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(true, timeSupplier, String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS * 2), segmentTimeThresholdMins + "m", null); segmentDataManager._stubConsumeLoop = false; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.INITIAL_CONSUMING); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS); segmentDataManager._consumeOffsets.add(endOffset); @@ -876,13 +876,13 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { final int segmentTimeThresholdMins = 10; TimeSupplier timeSupplier = new TimeSupplier(); - FakeLLRealtimeSegmentDataManager segmentDataManager = + FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(true, timeSupplier, String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS), segmentTimeThresholdMins + "m", null); segmentDataManager._stubConsumeLoop = false; - segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING); + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.INITIAL_CONSUMING); - LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); + RealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS); segmentDataManager._consumeOffsets.add(endOffset); @@ -928,7 +928,7 @@ public class LLRealtimeSegmentDataManagerTest { } } - public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager { + public static class FakeRealtimeSegmentDataManager extends RealtimeSegmentDataManager { public Field _state; public Field _shouldStop; @@ -958,7 +958,7 @@ public class LLRealtimeSegmentDataManagerTest { return dataManagerConfig; } - public FakeLLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, + public FakeRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, Schema schema, LLCSegmentName llcSegmentName, Map<Integer, Semaphore> semaphoreMap, ServerMetrics serverMetrics, TimeSupplier timeSupplier) @@ -966,14 +966,14 @@ public class LLRealtimeSegmentDataManagerTest { super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName, semaphoreMap.get(llcSegmentName.getPartitionGroupId()), serverMetrics, null, null, () -> true); - _state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state"); + _state = RealtimeSegmentDataManager.class.getDeclaredField("_state"); _state.setAccessible(true); - _shouldStop = LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop"); + _shouldStop = RealtimeSegmentDataManager.class.getDeclaredField("_shouldStop"); _shouldStop.setAccessible(true); - _stopReason = LLRealtimeSegmentDataManager.class.getDeclaredField("_stopReason"); + _stopReason = RealtimeSegmentDataManager.class.getDeclaredField("_stopReason"); _stopReason.setAccessible(true); _semaphoreMap = semaphoreMap; - _streamMsgOffsetFactory = LLRealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory"); + _streamMsgOffsetFactory = RealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory"); _streamMsgOffsetFactory.setAccessible(true); _streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory()); _timeSupplier = timeSupplier; @@ -1129,7 +1129,7 @@ public class LLRealtimeSegmentDataManagerTest { public boolean invokeEndCriteriaReached() { Method endCriteriaReached = null; try { - endCriteriaReached = LLRealtimeSegmentDataManager.class.getDeclaredMethod("endCriteriaReached"); + endCriteriaReached = RealtimeSegmentDataManager.class.getDeclaredMethod("endCriteriaReached"); endCriteriaReached.setAccessible(true); Boolean result = (Boolean) endCriteriaReached.invoke(this); return result; @@ -1149,7 +1149,7 @@ public class LLRealtimeSegmentDataManagerTest { private void setLong(long value, String fieldName) { try { - Field field = LLRealtimeSegmentDataManager.class.getDeclaredField(fieldName); + Field field = RealtimeSegmentDataManager.class.getDeclaredField(fieldName); field.setAccessible(true); field.setLong(this, value); } catch (NoSuchFieldException e) { @@ -1161,7 +1161,7 @@ public class LLRealtimeSegmentDataManagerTest { private void setOffset(long value, String fieldName) { try { - Field field = LLRealtimeSegmentDataManager.class.getDeclaredField(fieldName); + Field field = RealtimeSegmentDataManager.class.getDeclaredField(fieldName); field.setAccessible(true); StreamPartitionMsgOffset offset = (StreamPartitionMsgOffset) field.get(this); // if (offset == null) { @@ -1178,7 +1178,7 @@ public class LLRealtimeSegmentDataManagerTest { private void setInt(int value, String fieldName) { try { - Field field = LLRealtimeSegmentDataManager.class.getDeclaredField(fieldName); + Field field = RealtimeSegmentDataManager.class.getDeclaredField(fieldName); field.setAccessible(true); field.setInt(this, value); } catch (NoSuchFieldException e) { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java index e7b79c5156..5c666a28bd 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java @@ -45,7 +45,7 @@ import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.common.restlet.resources.SegmentServerDebugInfo; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; -import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.spi.ImmutableSegment; @@ -166,7 +166,7 @@ public class DebugResource { private SegmentConsumerInfo getSegmentConsumerInfo(SegmentDataManager segmentDataManager, TableType tableType) { SegmentConsumerInfo segmentConsumerInfo = null; if (tableType == TableType.REALTIME) { - LLRealtimeSegmentDataManager realtimeSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager; + RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; Map<String, ConsumerPartitionState> partitionStateMap = realtimeSegmentDataManager.getConsumerPartitionState(); Map<String, String> currentOffsets = realtimeSegmentDataManager.getPartitionToCurrentOffset(); Map<String, String> upstreamLatest = partitionStateMap.entrySet().stream().collect( diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index 7999360044..b08833b966 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -76,7 +76,7 @@ import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; -import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -309,7 +309,7 @@ public class TablesResource { int totalSegmentCount = 0; Map<String, Map<String, Integer>> columnToIndexesCount = new HashMap<>(); for (SegmentDataManager segmentDataManager : allSegments) { - if (segmentDataManager instanceof LLRealtimeSegmentDataManager) { + if (segmentDataManager instanceof RealtimeSegmentDataManager) { // REALTIME segments may not have indexes since not all indexes have mutable implementations continue; } @@ -685,8 +685,8 @@ public class TablesResource { List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireAllSegments(); try { for (SegmentDataManager segmentDataManager : segmentDataManagers) { - if (segmentDataManager instanceof LLRealtimeSegmentDataManager) { - LLRealtimeSegmentDataManager realtimeSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager; + if (segmentDataManager instanceof RealtimeSegmentDataManager) { + RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; Map<String, ConsumerPartitionState> partitionStateMap = realtimeSegmentDataManager.getConsumerPartitionState(); Map<String, String> recordsLagMap = new HashMap<>(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java index abce5d5aaa..6f3610e596 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java @@ -21,7 +21,7 @@ package org.apache.pinot.server.starter.helix; import java.util.Set; import org.apache.pinot.core.data.manager.InstanceDataManager; -import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -63,7 +63,7 @@ public class FreshnessBasedConsumptionStatusChecker extends IngestionBasedConsum } @Override - protected boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataManager rtSegmentDataManager) { + protected boolean isSegmentCaughtUp(String segmentName, RealtimeSegmentDataManager rtSegmentDataManager) { long now = now(); long latestIngestionTimestamp = rtSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index ba66798763..480c392db8 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -53,8 +53,8 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; -import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender; import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.core.util.SegmentRefreshSemaphore; @@ -171,7 +171,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { File[] tableDataDirs = instanceDataDir.listFiles((dir, name) -> TableNameBuilder.isTableResource(name)); if (tableDataDirs != null) { for (File tableDataDir : tableDataDirs) { - File resourceTempDir = new File(tableDataDir, LLRealtimeSegmentDataManager.RESOURCE_TEMP_DIR_NAME); + File resourceTempDir = new File(tableDataDir, RealtimeSegmentDataManager.RESOURCE_TEMP_DIR_NAME); try { FileUtils.deleteDirectory(resourceTempDir); } catch (IOException e) { @@ -447,10 +447,10 @@ public class HelixInstanceDataManager implements InstanceDataManager { tableNameWithType); return; } - if (segmentDataManager instanceof LLRealtimeSegmentDataManager) { + if (segmentDataManager instanceof RealtimeSegmentDataManager) { LOGGER.info("Reloading (force committing) consuming segment: {} in table: {}", segmentName, tableNameWithType); - ((LLRealtimeSegmentDataManager) segmentDataManager).forceCommit(); + ((RealtimeSegmentDataManager) segmentDataManager).forceCommit(); } return; } finally { @@ -600,8 +600,8 @@ public class HelixInstanceDataManager implements InstanceDataManager { SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segName); if (segmentDataManager != null) { try { - if (segmentDataManager instanceof LLRealtimeSegmentDataManager) { - ((LLRealtimeSegmentDataManager) segmentDataManager).forceCommit(); + if (segmentDataManager instanceof RealtimeSegmentDataManager) { + ((RealtimeSegmentDataManager) segmentDataManager).forceCommit(); } } finally { tableDataManager.releaseSegment(segmentDataManager); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java index 54667a43ac..83de35a63c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java @@ -23,7 +23,7 @@ import java.util.HashSet; import java.util.Set; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.core.data.manager.InstanceDataManager; -import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.config.table.TableType; @@ -66,7 +66,7 @@ public abstract class IngestionBasedConsumptionStatusChecker { segName); continue; } - if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) { + if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) { // There's a possibility that a consuming segment has converted to a committed segment. If that's the case, // segment data manager will not be of type RealtimeSegmentDataManager. _logger.info("Segment {} is already committed and is considered caught up.", segName); @@ -74,7 +74,7 @@ public abstract class IngestionBasedConsumptionStatusChecker { continue; } - LLRealtimeSegmentDataManager rtSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager; + RealtimeSegmentDataManager rtSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; if (isSegmentCaughtUp(segName, rtSegmentDataManager)) { _caughtUpSegments.add(segName); } @@ -87,7 +87,7 @@ public abstract class IngestionBasedConsumptionStatusChecker { return _consumingSegments.size() - _caughtUpSegments.size(); } - protected abstract boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataManager rtSegmentDataManager); + protected abstract boolean isSegmentCaughtUp(String segmentName, RealtimeSegmentDataManager rtSegmentDataManager); private TableDataManager getTableDataManager(String segmentName) { LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java index 5deafb2daf..6b597e3fa2 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java @@ -21,7 +21,7 @@ package org.apache.pinot.server.starter.helix; import java.util.Set; import org.apache.pinot.core.data.manager.InstanceDataManager; -import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -39,7 +39,7 @@ public class OffsetBasedConsumptionStatusChecker extends IngestionBasedConsumpti } @Override - protected boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataManager rtSegmentDataManager) { + protected boolean isSegmentCaughtUp(String segmentName, RealtimeSegmentDataManager rtSegmentDataManager) { StreamPartitionMsgOffset latestIngestedOffset = rtSegmentDataManager.getCurrentOffset(); StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.getLatestStreamOffsetAtStartupTime(); if (latestStreamOffset == null || latestIngestedOffset == null) { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index 1ad92ee96d..42d1642c7b 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -30,7 +30,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.core.data.manager.InstanceDataManager; -import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.config.table.TableType; @@ -107,14 +107,14 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta // TODO: https://github.com/apache/pinot/issues/10049 try { - if (!(acquiredSegment instanceof LLRealtimeSegmentDataManager)) { + if (!(acquiredSegment instanceof RealtimeSegmentDataManager)) { // We found an LLC segment that is not consuming right now, must be that we already swapped it with a // segment that has been built. Nothing to do for this state transition. _logger.info("Segment {} not an instance of RealtimeSegmentDataManager. Reporting success for the transition", acquiredSegment.getSegmentName()); return; } - LLRealtimeSegmentDataManager segmentDataManager = (LLRealtimeSegmentDataManager) acquiredSegment; + RealtimeSegmentDataManager segmentDataManager = (RealtimeSegmentDataManager) acquiredSegment; SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), realtimeTableName, segmentName); diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java index cab178bc0d..6301b54d04 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java @@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableSet; import java.util.Set; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; -import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.spi.MutableSegment; import org.apache.pinot.segment.spi.SegmentMetadata; @@ -73,9 +73,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest { when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); // setup SegmentDataManagers - LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA1 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); @@ -119,7 +119,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest { assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0); } - private void setupLatestIngestionTimestamp(LLRealtimeSegmentDataManager segmentDataManager, + private void setupLatestIngestionTimestamp(RealtimeSegmentDataManager segmentDataManager, long latestIngestionTimestamp) { MutableSegment mockSegment = mock(MutableSegment.class); SegmentMetadata mockSegmentMetdata = mock(SegmentMetadata.class); @@ -148,9 +148,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest { when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); // setup SegmentDataManagers - LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA1 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); @@ -212,9 +212,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest { when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); // setup SegmentDataManagers - LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA1 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); @@ -285,9 +285,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest { when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); // setup SegmentDataManagers - LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA1 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); @@ -334,9 +334,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest { when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); // setup SegmentDataManagers - LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA1 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); @@ -392,9 +392,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest { when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); // setup SegmentDataManagers - LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA1 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java index 20ad08a11d..88b05b8ff0 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java @@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableSet; import java.util.Set; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; -import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.stream.LongMsgOffset; import org.testng.annotations.Test; @@ -53,9 +53,9 @@ public class OffsetBasedConsumptionStatusCheckerTest { when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); // setup SegmentDataManagers - LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA1 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); @@ -104,8 +104,8 @@ public class OffsetBasedConsumptionStatusCheckerTest { when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); // setup some SegmentDataManagers - LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA1 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); @@ -120,7 +120,7 @@ public class OffsetBasedConsumptionStatusCheckerTest { assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3); // setup the remaining SegmentDataManager - LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); // latest ingested offset latest stream offset @@ -161,9 +161,9 @@ public class OffsetBasedConsumptionStatusCheckerTest { when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); // setup SegmentDataManagers - LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA1 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); @@ -211,9 +211,9 @@ public class OffsetBasedConsumptionStatusCheckerTest { when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); // setup SegmentDataManagers - LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); - LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrA1 = mock(RealtimeSegmentDataManager.class); + RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class); when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org