This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit dcb2ee1354ff86417d28276ef03fbd0eaa758236 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Wed Jan 20 17:17:18 2021 -0800 Add tests for end-of-life cases --- .../realtime/PinotLLCRealtimeSegmentManager.java | 26 ++-- .../PinotLLCRealtimeSegmentManagerTest.java | 162 +++++++++++++++++---- .../realtime/LLRealtimeSegmentDataManagerTest.java | 10 +- 3 files changed, 155 insertions(+), 43 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 72caaf4..b137b5d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -515,21 +515,20 @@ public class PinotLLCRealtimeSegmentManager { // If there were no splits/merges we would receive A,B List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); + Set<Integer> newPartitionGroupSet = + newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet()); int numPartitions = newPartitionGroupInfoList.size(); // Only if committingSegment's partitionGroup is present in the newPartitionGroupInfoList, we create new segment metadata String newConsumingSegmentName = null; String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); - for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) { - if (partitionGroupInfo.getPartitionGroupId() == committingSegmentPartitionGroupId) { - LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, - committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs, - committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); - newConsumingSegmentName = newLLCSegment.getSegmentName(); - break; - } + if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) { + LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, + committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); + createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs, + committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); + newConsumingSegmentName = newLLCSegment.getSegmentName(); } // TODO: create new partition groups also here @@ -943,7 +942,10 @@ public class PinotLLCRealtimeSegmentManager { * a) metadata status is IN_PROGRESS, segment state is CONSUMING - happy path * b) metadata status is IN_PROGRESS, segment state is OFFLINE - create new metadata and new CONSUMING segment * c) metadata status is DONE, segment state is OFFLINE - create new metadata and new CONSUMING segment - * d) metadata status is DONE, segment state is CONSUMING - create new metadata and new CONSUMING segment + * d) metadata status is DONE, segment state is CONSUMING - + * If shard not reached end of life, create new metadata and new CONSUMING segment. Update current segment to ONLINE in ideal state. + * If shard reached end of life, do not create new metadata and CONSUMING segment. Simply update current segment to ONLINE in ideal state + * * 2) Segment is absent from ideal state - add new segment to ideal state * * Also checks if it is too soon to correct (could be in the process of committing segment) @@ -985,8 +987,8 @@ public class PinotLLCRealtimeSegmentManager { // Possible things to repair: // 1. The latest metadata is in DONE state, but the idealstate says segment is CONSUMING: // a. Create metadata for next segment and find hosts to assign it to. - // b. update current segment in idealstate to ONLINE - // c. add new segment in idealstate to CONSUMING on the hosts. + // b. update current segment in idealstate to ONLINE (only if partition is present in newPartitionGroupInfo) + // c. add new segment in idealstate to CONSUMING on the hosts (only if partition is present in newPartitionGroupInfo) // 2. The latest metadata is IN_PROGRESS, but segment is not there in idealstate. // a. change prev segment to ONLINE in idealstate // b. add latest segment to CONSUMING in idealstate. diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index ecbf2ef..e8309d3 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.helix.core.realtime; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -58,7 +59,6 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.stream.LongMsgOffset; -import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; @@ -249,6 +249,49 @@ public class PinotLLCRealtimeSegmentManagerTest { } catch (IllegalStateException e) { // Expected } + + // committing segment's partitionGroupId no longer in the newPartitionGroupInfoList + List<PartitionGroupInfo> partitionGroupInfoListWithout0 = + segmentManager.getPartitionGroupInfoList(segmentManager._streamConfig, Collections.emptyList()); + partitionGroupInfoListWithout0.remove(0); + segmentManager._partitionGroupInfoList = partitionGroupInfoListWithout0; + + // Commit a segment for partition 0 - No new entries created for partition which reached end of life + committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2, CURRENT_TIME_MS).getSegmentName(); + String committingSegmentStartOffset = segmentManager._segmentZKMetadataMap.get(committingSegment).getStartOffset(); + String committingSegmentEndOffset = + new LongMsgOffset(Long.parseLong(committingSegmentStartOffset) + NUM_DOCS).toString(); + committingSegmentDescriptor = new CommittingSegmentDescriptor(committingSegment, committingSegmentEndOffset, 0L); + committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata()); + int instanceStateMapSize = instanceStatesMap.size(); + int metadataMapSize = segmentManager._segmentZKMetadataMap.size(); + segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor); + // No changes in the number of ideal state or zk entries + assertEquals(instanceStatesMap.size(), instanceStateMapSize); + assertEquals(segmentManager._segmentZKMetadataMap.size(), metadataMapSize); + + // Verify instance states for committed segment and new consuming segment + committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment); + assertNotNull(committedSegmentInstanceStateMap); + assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()), + Collections.singleton(SegmentStateModel.ONLINE)); + + consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName(); + consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment); + assertNull(consumingSegmentInstanceStateMap); + + // Verify segment ZK metadata for committed segment and new consuming segment + committedSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment); + assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE); + assertEquals(committedSegmentZKMetadata.getStartOffset(), committingSegmentStartOffset); + assertEquals(committedSegmentZKMetadata.getEndOffset(), committingSegmentEndOffset); + assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS); + assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC)); + assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION); + assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS); + + consumingSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(consumingSegment); + assertNull(consumingSegmentZKMetadata); } /** @@ -411,6 +454,20 @@ public class PinotLLCRealtimeSegmentManagerTest { * * 4. MaxSegmentCompletionTime: Segment completion has 5 minutes to retry and complete between steps 1 and 3. * Correction: Do not correct the segments before the allowed time for segment completion + * + * + * End-of-shard case: + * Additionally, shards of some streams may be detected as reached end-of-life when committing. + * In such cases, step 2 is skipped, and step 3 is done partially (change committing segment state to ONLINE + * but don't create new segment with state CONSUMING) + * + * Scenarios: + * 1. Step 3 failed - we will find segment ZK metadata DONE, but ideal state CONSUMING + * Correction: Since shard has ended, do not create new segment ZK metadata, or new entry in ideal state. + * Simply update CONSUMING segment in ideal state to ONLINE + * + * 2. Shard which has reached EOL detected - we will find segment ZK metadata DONE and ideal state ONLINE + * Correction: No repair needed. Acceptable case. */ @Test public void testRepairs() { @@ -422,12 +479,12 @@ public class PinotLLCRealtimeSegmentManagerTest { // Remove the CONSUMING segment from the ideal state for partition 0 (step 3 failed) String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); removeNewConsumingSegment(instanceStatesMap, consumingSegment, null); - testRepairs(segmentManager); + testRepairs(segmentManager, Collections.emptyList()); // Remove the CONSUMING segment from the ideal state and segment ZK metadata map for partition 0 (step 2 failed) removeNewConsumingSegment(instanceStatesMap, consumingSegment, null); assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment)); - testRepairs(segmentManager); + testRepairs(segmentManager, Collections.emptyList()); // 2 partitions commit segment for (int partitionId = 0; partitionId < 2; partitionId++) { @@ -442,12 +499,12 @@ public class PinotLLCRealtimeSegmentManagerTest { consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName(); String latestCommittedSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment); - testRepairs(segmentManager); + testRepairs(segmentManager, Collections.emptyList()); // Remove the CONSUMING segment from the ideal state and segment ZK metadata map for partition 0 (step 2 failed) removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment); assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment)); - testRepairs(segmentManager); + testRepairs(segmentManager, Collections.emptyList()); /* Test all replicas of the new segment are OFFLINE @@ -461,12 +518,12 @@ public class PinotLLCRealtimeSegmentManagerTest { // Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0 consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment); - testRepairs(segmentManager); + testRepairs(segmentManager, Collections.emptyList()); // Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0 again consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName(); turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment); - testRepairs(segmentManager); + testRepairs(segmentManager, Collections.emptyList()); // 2 partitions commit segment for (int partitionId = 0; partitionId < 2; partitionId++) { @@ -484,22 +541,51 @@ public class PinotLLCRealtimeSegmentManagerTest { consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName(); latestCommittedSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2, CURRENT_TIME_MS).getSegmentName(); removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment); - testRepairs(segmentManager); + testRepairs(segmentManager, Collections.emptyList()); // Remove the CONSUMING segment from the ideal state and segment ZK metadata map for partition 0 (step 2 failed) removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment); assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment)); - testRepairs(segmentManager); + testRepairs(segmentManager, Collections.emptyList()); // Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0 consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName(); turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment); - testRepairs(segmentManager); + testRepairs(segmentManager, Collections.emptyList()); // Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0 again consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 4, CURRENT_TIME_MS).getSegmentName(); turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment); - testRepairs(segmentManager); + testRepairs(segmentManager, Collections.emptyList()); + + /* + * End of shard cases + */ + // 1 reached end of shard. + List<PartitionGroupInfo> partitionGroupInfoListWithout1 = + segmentManager.getPartitionGroupInfoList(segmentManager._streamConfig, Collections.emptyList()); + partitionGroupInfoListWithout1.remove(1); + segmentManager._partitionGroupInfoList = partitionGroupInfoListWithout1; + // noop + testRepairs(segmentManager, Collections.emptyList()); + + // 1 commits segment - should not create new metadata or CONSUMING segment + String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, CURRENT_TIME_MS).getSegmentName(); + String startOffset = segmentManager._segmentZKMetadataMap.get(segmentName).getStartOffset(); + CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(segmentName, + new LongMsgOffset(Long.parseLong(startOffset) + NUM_DOCS).toString(), 0L); + committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata()); + segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor); + // ONLINE in IS and metadata DONE, but end of shard (not present in partition group list), so don't repair + testRepairs(segmentManager, Lists.newArrayList(1)); + + // make the last ONLINE segment of the shard as CONSUMING (failed between step1 and 3) + segmentManager._partitionGroupInfoList = partitionGroupInfoListWithout1; + consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, CURRENT_TIME_MS).getSegmentName(); + turnNewConsumingSegmentConsuming(instanceStatesMap, consumingSegment); + + // makes the IS to ONLINE, but creates no new entries, because end of shard. + testRepairs(segmentManager, Lists.newArrayList(1)); } /** @@ -539,7 +625,19 @@ public class PinotLLCRealtimeSegmentManagerTest { } } - private void testRepairs(FakePinotLLCRealtimeSegmentManager segmentManager) { + /** + * Turns all instances for the segment to CONSUMING in the ideal state. + */ + private void turnNewConsumingSegmentConsuming(Map<String, Map<String, String>> instanceStatesMap, + String consumingSegment) { + Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment); + assertNotNull(consumingSegmentInstanceStateMap); + for (Map.Entry<String, String> entry : consumingSegmentInstanceStateMap.entrySet()) { + entry.setValue(SegmentStateModel.CONSUMING); + } + } + + private void testRepairs(FakePinotLLCRealtimeSegmentManager segmentManager, List<Integer> shardsEnded) { Map<String, Map<String, String>> oldInstanceStatesMap = cloneInstanceStatesMap(segmentManager._idealState.getRecord().getMapFields()); segmentManager._exceededMaxSegmentCompletionTime = false; @@ -547,7 +645,7 @@ public class PinotLLCRealtimeSegmentManagerTest { verifyNoChangeToOldEntries(segmentManager, oldInstanceStatesMap); segmentManager._exceededMaxSegmentCompletionTime = true; segmentManager.ensureAllPartitionsConsuming(); - verifyRepairs(segmentManager); + verifyRepairs(segmentManager, shardsEnded); } /** @@ -564,7 +662,7 @@ public class PinotLLCRealtimeSegmentManagerTest { } } - private void verifyRepairs(FakePinotLLCRealtimeSegmentManager segmentManager) { + private void verifyRepairs(FakePinotLLCRealtimeSegmentManager segmentManager, List<Integer> shardsEnded) { Map<String, Map<String, String>> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields(); // Segments are the same for ideal state and ZK metadata @@ -597,16 +695,18 @@ public class PinotLLCRealtimeSegmentManagerTest { int numSegments = segments.size(); String latestSegment = segments.get(numSegments - 1); - - // Latest segment should have CONSUMING instance but no ONLINE instance in ideal state Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegment); - assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)); - assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE)); - - // Latest segment ZK metadata should be IN_PROGRESS - assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(), Status.IN_PROGRESS); + if (!shardsEnded.contains(partitionId)) { + // Latest segment should have CONSUMING instance but no ONLINE instance in ideal state + assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)); + assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE)); + + // Latest segment ZK metadata should be IN_PROGRESS + assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(), Status.IN_PROGRESS); + numSegments --; + } - for (int i = 0; i < numSegments - 1; i++) { + for (int i = 0; i < numSegments; i++) { String segmentName = segments.get(i); // Committed segment should have all instances in ONLINE state @@ -620,8 +720,13 @@ public class PinotLLCRealtimeSegmentManagerTest { // Verify segment start/end offset assertEquals(segmentZKMetadata.getStartOffset(), new LongMsgOffset(PARTITION_OFFSET.getOffset() + i * (long) NUM_DOCS).toString()); - assertEquals(segmentZKMetadata.getEndOffset(), - segmentManager._segmentZKMetadataMap.get(segments.get(i + 1)).getStartOffset()); + if (shardsEnded.contains(partitionId) && ((i + 1) == numSegments)) { + assertEquals(Long.parseLong(segmentZKMetadata.getEndOffset()), + Long.parseLong(segmentZKMetadata.getStartOffset()) + NUM_DOCS); + } else { + assertEquals(segmentZKMetadata.getEndOffset(), + segmentManager._segmentZKMetadataMap.get(segments.get(i + 1)).getStartOffset()); + } } } } @@ -818,6 +923,7 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, Integer> _segmentZKMetadataVersionMap = new HashMap<>(); IdealState _idealState; int _numPartitions; + List<PartitionGroupInfo> _partitionGroupInfoList = null; boolean _exceededMaxSegmentCompletionTime = false; FakePinotLLCRealtimeSegmentManager() { @@ -919,9 +1025,11 @@ public class PinotLLCRealtimeSegmentManagerTest { @Override List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { - return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i, - PARTITION_OFFSET)) - .collect(Collectors.toList()); + if (_partitionGroupInfoList != null) { + return _partitionGroupInfoList; + } else { + return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i, PARTITION_OFFSET)).collect(Collectors.toList()); + } } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index d7aec8d..ae8b138 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -35,6 +35,7 @@ import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; +import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig; import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl; @@ -147,6 +148,7 @@ public class LLRealtimeSegmentDataManagerTest { segmentZKMetadata.setSegmentName(_segmentNameStr); segmentZKMetadata.setStartOffset(_startOffset.toString()); segmentZKMetadata.setCreationTime(System.currentTimeMillis()); + segmentZKMetadata.setStatus(Status.IN_PROGRESS); return segmentZKMetadata; } @@ -771,7 +773,7 @@ public class LLRealtimeSegmentDataManagerTest { public Field _state; public Field _shouldStop; public Field _stopReason; - private Field _streamMsgOffsetFactory; + private final Field _checkpointFactory; public LinkedList<LongMsgOffset> _consumeOffsets = new LinkedList<>(); public LinkedList<SegmentCompletionProtocol.Response> _responses = new LinkedList<>(); public boolean _commitSegmentCalled = false; @@ -810,9 +812,9 @@ public class LLRealtimeSegmentDataManagerTest { _stopReason = LLRealtimeSegmentDataManager.class.getDeclaredField("_stopReason"); _stopReason.setAccessible(true); _semaphoreMap = semaphoreMap; - _streamMsgOffsetFactory = LLRealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory"); - _streamMsgOffsetFactory.setAccessible(true); - _streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory()); + _checkpointFactory = LLRealtimeSegmentDataManager.class.getDeclaredField("_checkpointFactory"); + _checkpointFactory.setAccessible(true); + _checkpointFactory.set(this, new LongMsgOffsetFactory()); } public String getStopReason() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org