This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d46edb089 Kinesis partition split fixes (#15563)
3d46edb089 is described below

commit 3d46edb089325860a4c1d1f005dfb2d74139539f
Author: Krishan Goyal <kris...@startree.ai>
AuthorDate: Tue Apr 29 13:47:14 2025 +0530

    Kinesis partition split fixes (#15563)
    
    * Initial fixes to fix issues related to kinesis partition split
    
    * Refactor kinesis tests to make it easy to add more tests
    
    * Created a test case for shard increase and fixed bug related to end of 
consumption
    
    * Added more tests to test split / merge combinations with pause / resume / 
RVM triggers on a old / new table
    
    * Checkstlye fixes
    
    * Small refactors and attempting to see if we can consume from ZK offset 
always
    
    * Fix kafka regression with a workaround flag.
    
    * Add overrriden function for test case
    
    * Add more testing around largest offset and concurrent pause / resume 
functionality
    
    * Avoid overridding pulsar behaviour to continue with current behaviour for 
now
    
    * Improving some documentation
    
    * Address PR comments
    
    * Retry message fetch outside kinesis consumer
    
    * Checkstyle fixes
---
 .../helix/core/PinotTableIdealStateBuilder.java    |   7 +-
 .../realtime/MissingConsumingSegmentFinder.java    |   2 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  39 +-
 .../pinot/controller/helix/ControllerTest.java     |  88 +++++
 .../PinotLLCRealtimeSegmentManagerTest.java        |   7 +
 .../realtime/RealtimeSegmentDataManager.java       |   5 +-
 .../tests/BaseClusterIntegrationTest.java          |  10 +-
 .../ingestion/BaseKinesisIntegrationTest.java      | 236 ++++++++++++
 ...aIncreaseDecreasePartitionsIntegrationTest.java |  67 +---
 .../realtime/ingestion/KinesisShardChangeTest.java | 425 +++++++++++++++++++++
 .../ingestion}/RealtimeKinesisIntegrationTest.java | 212 +---------
 .../realtime/ingestion/utils/KinesisUtils.java     | 112 ++++++
 .../plugin/stream/kinesis/KinesisConsumer.java     |  11 +
 .../kinesis/KinesisStreamMetadataProvider.java     |  42 +-
 .../kinesis/KinesisStreamMetadataProviderTest.java |  32 ++
 .../FreshnessBasedConsumptionStatusChecker.java    |  12 +-
 .../IngestionBasedConsumptionStatusChecker.java    |  21 +
 .../helix/OffsetBasedConsumptionStatusChecker.java |  21 +-
 .../spi/stream/PartitionGroupMetadataFetcher.java  |   9 +-
 .../pinot/spi/stream/StreamMetadataProvider.java   |  21 +
 .../utils/builder/ControllerRequestURLBuilder.java |   5 +
 21 files changed, 1090 insertions(+), 294 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 8895d9df50..244f7853d8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -85,11 +85,12 @@ public class PinotTableIdealStateBuilder {
    *                                            partition groups.
    *                                          The size of this list is equal 
to the number of partition groups,
    *                                          and is created using the latest 
segment zk metadata.
+   * @param forceGetOffsetFromStream - details in 
PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset
    */
   public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
-      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
-    PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
-        new PartitionGroupMetadataFetcher(streamConfigs, 
partitionGroupConsumptionStatusList);
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) {
+    PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new 
PartitionGroupMetadataFetcher(
+        streamConfigs, partitionGroupConsumptionStatusList, 
forceGetOffsetFromStream);
     try {
       
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
       return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index 5fe2ffe6d6..efc43246b7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -81,7 +81,7 @@ public class MissingConsumingSegmentFinder {
       return streamConfig;
     });
     try {
-      PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList())
+      PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList(), false)
           .forEach(metadata -> {
             
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
           });
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index cead9ddfd4..765b25852a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1001,6 +1001,9 @@ public class PinotLLCRealtimeSegmentManager {
         partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream()
             .map(partitionId -> 
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, 
index))
             .collect(Collectors.toSet()));
+      } catch (UnsupportedOperationException ignored) {
+        allPartitionIdsFetched = false;
+        // Stream does not support fetching partition ids. There is a log in 
the fallback code which is sufficient
       } catch (Exception e) {
         allPartitionIdsFetched = false;
         LOGGER.warn("Failed to fetch partition ids for stream: {}", 
streamConfigs.get(i).getTopicName(), e);
@@ -1035,7 +1038,20 @@ public class PinotLLCRealtimeSegmentManager {
   List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
     return 
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
-        currentPartitionGroupConsumptionStatusList);
+        currentPartitionGroupConsumptionStatusList, false);
+  }
+
+  /**
+   * Fetches the latest state of the PartitionGroups for the stream
+   * If any partition has reached end of life, and all messages of that 
partition have been consumed by the segment,
+   * it will be skipped from the result
+   */
+  @VisibleForTesting
+  List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList,
+      boolean forceGetOffsetFromStream) {
+    return 
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
+        currentPartitionGroupConsumptionStatusList, forceGetOffsetFromStream);
   }
 
   /**
@@ -1460,7 +1476,7 @@ public class PinotLLCRealtimeSegmentManager {
     }
     // Create a map from partition id to the smallest stream offset
     Map<Integer, StreamPartitionMsgOffset> partitionIdToSmallestOffset = null;
-    if (offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA) {
+    if (offsetCriteria != null && 
offsetCriteria.equals(OffsetCriteria.SMALLEST_OFFSET_CRITERIA)) {
       partitionIdToSmallestOffset = partitionIdToStartOffset;
     }
 
@@ -1553,11 +1569,13 @@ public class PinotLLCRealtimeSegmentManager {
 
           // Smallest offset is fetched from stream once and cached in 
partitionIdToSmallestOffset.
           if (partitionIdToSmallestOffset == null) {
-            partitionIdToSmallestOffset = 
fetchPartitionGroupIdToSmallestOffset(streamConfigs);
+            partitionIdToSmallestOffset = 
fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState);
           }
 
           // Do not create new CONSUMING segment when the stream partition has 
reached end of life.
           if (!partitionIdToSmallestOffset.containsKey(partitionId)) {
+            LOGGER.info("PartitionGroup: {} has reached end of life. Skipping 
creation of new segment {}",
+                partitionId, latestSegmentName);
             continue;
           }
 
@@ -1651,13 +1669,24 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private Map<Integer, StreamPartitionMsgOffset> 
fetchPartitionGroupIdToSmallestOffset(
-      List<StreamConfig> streamConfigs) {
+      List<StreamConfig> streamConfigs, IdealState idealState) {
     Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = 
new HashMap<>();
     for (StreamConfig streamConfig : streamConfigs) {
+      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+          getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
       OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
       streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+
+      // Kinesis shard-split flow requires us to pass 
currentPartitionGroupConsumptionStatusList so that
+      // we can check if its completely consumed
+      // However the kafka implementation of computePartitionGroupMetadata() 
breaks if we pass the current status
+      // This leads to streamSmallestOffset set to null in selectStartOffset() 
method
+      // The overall dependency isn't clean and is causing the issue and 
requires refactor
+      // Temporarily, we are passing a boolean flag to indicate if we want to 
use the current status
+      // The kafka implementation of computePartitionGroupMetadata() will 
ignore the current status
+      // while the kinesis implementation will use it.
       List<PartitionGroupMetadata> partitionGroupMetadataList =
-          getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList());
+          getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, true);
       streamConfig.setOffsetCriteria(originalOffsetCriteria);
       for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
         partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index e92a185985..a5b05031c6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.FileUtils;
@@ -65,6 +66,8 @@ import org.apache.pinot.controller.BaseControllerStarter;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerStarter;
 import org.apache.pinot.controller.api.access.AllowAllAccessFactory;
+import org.apache.pinot.controller.api.resources.PauseStatusDetails;
+import org.apache.pinot.controller.api.resources.TableViews;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -76,6 +79,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -91,6 +95,9 @@ import static org.testng.Assert.*;
 
 
 public class ControllerTest {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerTest.class);
+
   public static final String LOCAL_HOST = "localhost";
   public static final String DEFAULT_DATA_DIR = new 
File(FileUtils.getTempDirectoryPath(),
       "test-controller-data-dir" + 
System.currentTimeMillis()).getAbsolutePath();
@@ -824,6 +831,87 @@ public class ControllerTest {
     }
   }
 
+  public void runRealtimeSegmentValidationTask(String tableName)
+      throws IOException {
+    runPeriodicTask("RealtimeSegmentValidationManager", tableName, 
TableType.REALTIME);
+  }
+
+  public void runPeriodicTask(String taskName, String tableName, TableType 
tableType)
+      throws IOException {
+    sendGetRequest(getControllerRequestURLBuilder().forPeriodTaskRun(taskName, 
tableName, tableType));
+  }
+
+  public void pauseTable(String tableName)
+      throws IOException {
+    
sendPostRequest(getControllerRequestURLBuilder().forPauseConsumption(tableName));
+    TestUtils.waitForCondition((aVoid) -> {
+      try {
+        PauseStatusDetails pauseStatusDetails =
+            
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
+                PauseStatusDetails.class);
+        if (pauseStatusDetails.getConsumingSegments().isEmpty()) {
+          return true;
+        }
+        LOGGER.warn("Table not yet paused. Response " + pauseStatusDetails);
+        return false;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 2000, 60_000L, "Failed to pause table: " + tableName);
+  }
+
+  public void resumeTable(String tableName)
+      throws IOException {
+    resumeTable(tableName, "lastConsumed");
+  }
+
+  public void resumeTable(String tableName, String offsetCriteria)
+      throws IOException {
+    
sendPostRequest(getControllerRequestURLBuilder().forResumeConsumption(tableName)
+        + "?consumeFrom=" + offsetCriteria);
+    TestUtils.waitForCondition((aVoid) -> {
+      try {
+        PauseStatusDetails pauseStatusDetails =
+            
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
+                PauseStatusDetails.class);
+        // Its possible no segment is in consuming state, so check pause flag
+        if (!pauseStatusDetails.getPauseFlag()) {
+          return true;
+        }
+        LOGGER.warn("Pause flag is not yet set to false. Response " + 
pauseStatusDetails);
+        return false;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 2000, 60_000L, "Failed to resume table: " + tableName);
+  }
+
+  public void waitForNumSegmentsInDesiredStateInEV(String tableName, String 
desiredState,
+      int desiredNumConsumingSegments, TableType type) {
+    TestUtils.waitForCondition((aVoid) -> {
+          try {
+            AtomicInteger numConsumingSegments = new AtomicInteger(0);
+            TableViews.TableView tableView = getExternalView(tableName, type);
+            Map<String, Map<String, String>> viewForType =
+                type.equals(TableType.OFFLINE) ? tableView._offline : 
tableView._realtime;
+            viewForType.values().forEach((v) -> {
+              numConsumingSegments.addAndGet((int) 
v.values().stream().filter((v1) -> v1.equals(desiredState)).count());
+            });
+            return numConsumingSegments.get() == desiredNumConsumingSegments;
+          } catch (IOException e) {
+            return false;
+          }
+        }, 5000, 60_000L,
+        "Failed to wait for " + desiredNumConsumingSegments + " consuming 
segments for table: " + tableName
+    );
+  }
+
+  public TableViews.TableView getExternalView(String tableName, TableType type)
+      throws IOException {
+    String state = 
sendGetRequest(getControllerRequestURLBuilder().forExternalView(tableName + "_" 
+ type));
+    return JsonUtils.stringToObject(state, TableViews.TableView.class);
+  }
+
   public static String sendGetRequest(String urlString)
       throws IOException {
     return sendGetRequest(urlString, null);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index a86cf62e2e..25c286d3a2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1832,6 +1832,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
       }
     }
 
+    @Override
+    List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList,
+        boolean forceGetOffsetFromStream) {
+      return getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
+    }
+
     @Override
     protected boolean isExceededMaxSegmentCompletionTime(String 
realtimeTableName, String segmentName,
         long currentTimeMs) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d49b8484a6..ffda64c0c7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -479,7 +479,10 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               messageBatch.getMessageCount(), 
messageBatch.getUnfilteredMessageCount(),
               messageBatch.isEndOfPartitionGroup());
         }
-        _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
+        // We need to check for both endOfPartitionGroup and messageCount == 
0, because
+        // endOfPartitionGroup can be true even if this is the last batch of 
messages (has been observed for kinesis)
+        // To process the last batch of messages, we need to set 
_endOfPartitionGroup to false in such a case
+        _endOfPartitionGroup = messageBatch.getMessageCount() == 0 && 
messageBatch.isEndOfPartitionGroup();
         _consecutiveErrorCount = 0;
       } catch (PermanentConsumerException e) {
         
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS,
 1L);
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index f8e11e7c5f..3156df8f65 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -366,7 +366,12 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
    */
   protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
     AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
-    return new TableConfigBuilder(TableType.REALTIME)
+    return getTableConfigBuilder(TableType.REALTIME).build();
+  }
+
+  // TODO - Use this method to create table config for all table types to 
avoid redundant code
+  protected TableConfigBuilder getTableConfigBuilder(TableType tableType) {
+    return new TableConfigBuilder(tableType)
         .setTableName(getTableName())
         .setTimeColumnName(getTimeColumnName())
         .setSortedColumn(getSortedColumn())
@@ -384,8 +389,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         .setIngestionConfig(getIngestionConfig())
         .setQueryConfig(getQueryConfig())
         .setStreamConfigs(getStreamConfigs())
-        .setNullHandlingEnabled(getNullHandlingEnabled())
-        .build();
+        .setNullHandlingEnabled(getNullHandlingEnabled());
   }
 
   /**
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/BaseKinesisIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/BaseKinesisIntegrationTest.java
new file mode 100644
index 0000000000..7842230084
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/BaseKinesisIntegrationTest.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.realtime.ingestion;
+
+import cloud.localstack.Localstack;
+import cloud.localstack.ServiceName;
+import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor;
+import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
+import cloud.localstack.docker.annotation.LocalstackDockerProperties;
+import cloud.localstack.docker.command.Command;
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import 
org.apache.pinot.integration.tests.realtime.ingestion.utils.KinesisUtils;
+import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
+import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.SkipException;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.utils.AttributeMap;
+
+
+/**
+ * Creates all dependencies (docker image, kinesis server, kinesis client, 
configs) for all tests requiring kinesis
+ */
+@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag = 
BaseKinesisIntegrationTest.LOCALSTACK_IMAGE)
+abstract class BaseKinesisIntegrationTest extends BaseClusterIntegrationTest {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseKinesisIntegrationTest.class);
+
+  static final String LOCALSTACK_IMAGE = "2.3.2";
+  private static final LocalstackDockerAnnotationProcessor PROCESSOR = new 
LocalstackDockerAnnotationProcessor();
+  private final Localstack _localstackDocker = Localstack.INSTANCE;
+  protected KinesisClient _kinesisClient;
+
+  private static final String REGION = "us-east-1";
+  private static final String LOCALSTACK_KINESIS_ENDPOINT = 
"http://localhost:4566";;
+  protected static final String STREAM_TYPE = "kinesis";
+  protected static final String STREAM_NAME = "kinesis-test";
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    try {
+      DockerInfoCommand dockerInfoCommand = new DockerInfoCommand();
+      dockerInfoCommand.execute();
+    } catch (IllegalStateException e) {
+      LOGGER.warn("Skipping kinesis tests! Docker is not found running", e);
+      throw new SkipException(e.getMessage());
+    }
+
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    startKinesis();
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+    stopKinesis();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  protected void createStream(int numShards) {
+    LOGGER.warn("Stream " + STREAM_NAME + " being created");
+    
_kinesisClient.createStream(CreateStreamRequest.builder().streamName(STREAM_NAME).shardCount(numShards).build());
+
+    TestUtils.waitForCondition(aVoid ->
+            KinesisUtils.isKinesisStreamActive(_kinesisClient, STREAM_NAME), 
2000L, 60000,
+        "Kinesis stream " + STREAM_NAME + " is not created or is not in active 
state", true);
+  }
+
+  protected void deleteStream() {
+    try {
+      
_kinesisClient.deleteStream(DeleteStreamRequest.builder().streamName(STREAM_NAME).build());
+    } catch (ResourceNotFoundException ignored) {
+      return;
+    }
+    TestUtils.waitForCondition(aVoid -> {
+          try {
+            KinesisUtils.getKinesisStreamStatus(_kinesisClient, STREAM_NAME);
+          } catch (ResourceNotFoundException e) {
+            return true;
+          }
+          return false;
+        }, 2000L, 60000,
+        "Kinesis stream " + STREAM_NAME + " is not deleted", true);
+
+    LOGGER.warn("Stream " + STREAM_NAME + " deleted");
+  }
+
+  protected PutRecordResponse putRecord(String data, String partitionKey) {
+    PutRecordRequest putRecordRequest =
+        
PutRecordRequest.builder().streamName(STREAM_NAME).data(SdkBytes.fromUtf8String(data))
+            .partitionKey(partitionKey).build();
+    return _kinesisClient.putRecord(putRecordRequest);
+  }
+
+  @Override
+  public Map<String, String> getStreamConfigs() {
+    Map<String, String> streamConfigMap = new HashMap<>();
+    String streamType = STREAM_TYPE;
+    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+        StreamConfigProperties.STREAM_TOPIC_NAME), STREAM_NAME);
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+        StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS), "30000");
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), 
KinesisConsumerFactory.class.getName());
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_DECODER_CLASS),
+        "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
+    streamConfigMap.put(KinesisConfig.REGION, REGION);
+    streamConfigMap.put(KinesisConfig.SHARD_ITERATOR_TYPE, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString());
+    streamConfigMap.put(KinesisConfig.ENDPOINT, LOCALSTACK_KINESIS_ENDPOINT);
+    streamConfigMap.put(KinesisConfig.ACCESS_KEY, 
getLocalAWSCredentials().resolveCredentials().accessKeyId());
+    streamConfigMap.put(KinesisConfig.SECRET_KEY, 
getLocalAWSCredentials().resolveCredentials().secretAccessKey());
+    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
Integer.toString(2000));
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
+    return streamConfigMap;
+  }
+
+  @Override
+  public TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+    // Calls the super class to create the table config.
+    // Properties like stream configs are overriden in the getStreamConfigs() 
method.
+    return super.createRealtimeTableConfig(sampleAvroFile);
+  }
+
+  private void stopKinesis() {
+    if (_kinesisClient != null) {
+      _kinesisClient.close();
+    }
+    if (_localstackDocker.isRunning()) {
+      _localstackDocker.stop();
+    }
+  }
+
+  private void startKinesis()
+      throws Exception {
+    LocalstackDockerConfiguration dockerConfig = 
PROCESSOR.process(this.getClass());
+    StopAllLocalstackDockerCommand stopAllLocalstackDockerCommand = new 
StopAllLocalstackDockerCommand();
+    stopAllLocalstackDockerCommand.execute();
+    _localstackDocker.startup(dockerConfig);
+
+    _kinesisClient = KinesisClient.builder().httpClient(new 
ApacheSdkHttpService().createHttpClientBuilder()
+            .buildWithDefaults(
+                
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, 
Boolean.TRUE).build()))
+        
.credentialsProvider(getLocalAWSCredentials()).region(Region.of(REGION))
+        .endpointOverride(new URI(LOCALSTACK_KINESIS_ENDPOINT)).build();
+  }
+
+  private static class StopAllLocalstackDockerCommand extends Command {
+
+    public void execute() {
+      String runningDockerContainers =
+          dockerExe.execute(
+              Arrays.asList("ps", "-a", "-q", "-f", 
"ancestor=localstack/localstack:" + LOCALSTACK_IMAGE));
+      if (StringUtils.isNotBlank(runningDockerContainers) && 
!runningDockerContainers.toLowerCase().contains("error")) {
+        String[] containerList = runningDockerContainers.split("\n");
+
+        for (String containerId : containerList) {
+          dockerExe.execute(Arrays.asList("stop", containerId));
+        }
+      }
+    }
+  }
+
+  private static class DockerInfoCommand extends Command {
+
+    public void execute() {
+      String dockerInfo = dockerExe.execute(Collections.singletonList("info"));
+
+      if (dockerInfo.toLowerCase().contains("error")) {
+        throw new IllegalStateException("Docker daemon is not running!");
+      }
+    }
+  }
+
+  private static AwsCredentialsProvider getLocalAWSCredentials() {
+    return 
StaticCredentialsProvider.create(AwsBasicCredentials.create("access", 
"secret"));
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java
similarity index 54%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java
index 4ea04f9895..3b40a6c417 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java
@@ -16,21 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.realtime.ingestion;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.pinot.controller.api.resources.PauseStatusDetails;
-import org.apache.pinot.controller.api.resources.TableViews;
+import org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
+import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.*;
+
 
 public class KafkaIncreaseDecreasePartitionsIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaIncreaseDecreasePartitionsIntegrationTest.class);
@@ -38,41 +38,6 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest 
extends BaseRealtime
   private static final String KAFKA_TOPIC = "meetup";
   private static final int NUM_PARTITIONS = 1;
 
-  String getExternalView(String tableName)
-      throws IOException {
-    return 
sendGetRequest(getControllerRequestURLBuilder().forExternalView(tableName));
-  }
-
-  void pauseTable(String tableName)
-      throws IOException {
-    
sendPostRequest(getControllerRequestURLBuilder().forPauseConsumption(tableName));
-    TestUtils.waitForCondition((aVoid) -> {
-      try {
-        PauseStatusDetails pauseStatusDetails =
-            
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
-                PauseStatusDetails.class);
-        return pauseStatusDetails.getConsumingSegments().isEmpty();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }, 60_000L, "Failed to pause table: " + tableName);
-  }
-
-  void resumeTable(String tableName)
-      throws IOException {
-    
sendPostRequest(getControllerRequestURLBuilder().forResumeConsumption(tableName));
-    TestUtils.waitForCondition((aVoid) -> {
-      try {
-        PauseStatusDetails pauseStatusDetails =
-            
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
-                PauseStatusDetails.class);
-        return !pauseStatusDetails.getConsumingSegments().isEmpty();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }, 60_000L, "Failed to resume table: " + tableName);
-  }
-
   String createTable()
       throws IOException {
     Schema schema = createSchema("simpleMeetup_schema.json");
@@ -83,24 +48,6 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest 
extends BaseRealtime
     return tableConfig.getTableName();
   }
 
-  void waitForNumConsumingSegmentsInEV(String tableName, int 
desiredNumConsumingSegments) {
-    TestUtils.waitForCondition((aVoid) -> {
-          try {
-            AtomicInteger numConsumingSegments = new AtomicInteger(0);
-            String state = getExternalView(tableName);
-            TableViews.TableView tableView = JsonUtils.stringToObject(state, 
TableViews.TableView.class);
-            tableView._realtime.values().forEach((v) -> {
-              numConsumingSegments.addAndGet((int) 
v.values().stream().filter((v1) -> v1.equals("CONSUMING")).count());
-            });
-            return numConsumingSegments.get() == desiredNumConsumingSegments;
-          } catch (IOException e) {
-            LOGGER.error("Exception in waitForNumConsumingSegments: {}", 
e.getMessage());
-            return false;
-          }
-        }, 5000, 300_000L,
-        "Failed to wait for " + desiredNumConsumingSegments + " consuming 
segments for table: " + tableName);
-  }
-
   @Test
   public void testDecreasePartitions()
       throws Exception {
@@ -108,7 +55,7 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest 
extends BaseRealtime
     LOGGER.info("Creating Kafka topic with {} partitions", NUM_PARTITIONS + 2);
     _kafkaStarters.get(0).createTopic(KAFKA_TOPIC, 
KafkaStarterUtils.getTopicCreationProps(NUM_PARTITIONS + 2));
     String tableName = createTable();
-    waitForNumConsumingSegmentsInEV(tableName, NUM_PARTITIONS + 2);
+    waitForNumSegmentsInDesiredStateInEV(tableName, CONSUMING, NUM_PARTITIONS 
+ 2, TableType.REALTIME);
 
     pauseTable(tableName);
 
@@ -118,7 +65,7 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest 
extends BaseRealtime
     _kafkaStarters.get(0).createTopic(KAFKA_TOPIC, 
KafkaStarterUtils.getTopicCreationProps(NUM_PARTITIONS));
 
     resumeTable(tableName);
-    waitForNumConsumingSegmentsInEV(tableName, NUM_PARTITIONS);
+    waitForNumSegmentsInDesiredStateInEV(tableName, CONSUMING, NUM_PARTITIONS, 
TableType.REALTIME);
   }
 
   @Test(enabled = false)
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KinesisShardChangeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KinesisShardChangeTest.java
new file mode 100644
index 0000000000..22220e26eb
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KinesisShardChangeTest.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.realtime.ingestion;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.controller.api.resources.TableViews;
+import 
org.apache.pinot.integration.tests.realtime.ingestion.utils.KinesisUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
+import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
+
+
+public class KinesisShardChangeTest extends BaseKinesisIntegrationTest {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KinesisShardChangeTest.class);
+
+  private static final String SCHEMA_FILE_PATH = 
"kinesis/airlineStats_data_reduced.schema";
+  private static final String DATA_FILE_PATH = 
"kinesis/airlineStats_data_reduced.json";
+  private static final Integer NUM_SHARDS = 2;
+
+  @BeforeMethod
+  public void beforeTest()
+      throws IOException {
+    createStream(NUM_SHARDS);
+    addSchema(createSchema(SCHEMA_FILE_PATH));
+    TableConfig tableConfig = createRealtimeTableConfig(null);
+    addTableConfig(tableConfig);
+  }
+
+  @AfterMethod
+  public void afterTest()
+      throws IOException {
+    dropRealtimeTable(getTableName());
+    deleteSchema(getTableName());
+    deleteStream();
+  }
+
+  /**
+   * Data provider for shard split and merge tests with different offset 
combinations.
+   * Documentation is in the test method.
+   */
+  @DataProvider(name = "shardOffsetCombinations")
+  public Object[][] shardOffsetCombinations() {
+    return new Object[][]{
+        {"split", "smallest", "lastConsumed", 100, 250, 4, 4},
+        {"split", "smallest", null, 100, 250, 4, 4},
+        {"split", "largest", "lastConsumed", 50, 200, 2, 4},
+        {"split", "largest", null, 50, 200, 2, 4},
+        {"split", "lastConsumed", "lastConsumed", 200, 200, 6, 4},
+        {"split", "lastConsumed", "largest", 200, 200, 6, 4},
+        {"split", "lastConsumed", null, 200, 200, 2, 4},
+        {"split", null, null, 200, 200, 2, 4},
+        {"merge", "smallest", "lastConsumed", 100, 250, 4, 1},
+        {"merge", "smallest", null, 100, 250, 4, 1},
+        {"merge", "largest", "lastConsumed", 50, 200, 2, 1},
+        {"merge", "largest", null, 50, 200, 2, 1},
+        {"merge", "lastConsumed", "lastConsumed", 200, 200, 3, 1},
+        {"merge", "lastConsumed", "largest", 200, 200, 3, 1},
+        {"merge", "lastConsumed", null, 200, 200, 2, 1},
+        {"merge", null, null, 200, 200, 2, 1},
+    };
+  }
+
+  /**
+   * Test case to validate shard split/merge behavior with different offset 
combinations.
+   * The expectation is that
+   * 1. when "smallest" offset is used, the old parent shards would be 
consumed first.
+   *    New shards will not be consumed until RVM is run or resume() is called 
with lastConsumed / largest offset
+   * 2. when "largest" offset is used, only new records would be consumed and 
all prior records pushed to kinesis
+   *    would be skipped.
+   * 3. when "lastConsumed" offset is used, data would be consumed based on 
the last consumed offset.
+   * 4. when RealtimeSegmentValidationManager is triggered, the behaviour 
should be same as calling resume() with
+   *    "lastConsumed" offset.
+   * @param operation - "split" or "merge"
+   * @param firstOffsetCriteria - Offset criteria for the first resume call.
+   *                            If it's null, RealtimeSegmentValidationManager 
is triggered
+   * @param secondOffsetCriteria - Offset criteria for the second resume call.
+   *                             If it's null, 
RealtimeSegmentValidationManager is triggered
+   * @param firstExpectedRecords - Expected records after the first resume call
+   * @param secondExpectedRecords - Expected records after the second resume 
call
+   * @param expectedOnlineSegments - Expected number of online segments in the 
end
+   * @param expectedConsumingSegments - Expected Number of consuming segments 
in the end
+   */
+  @Test(dataProvider = "shardOffsetCombinations")
+  public void testShardOperationsWithOffsets(String operation, String 
firstOffsetCriteria, String secondOffsetCriteria,
+      int firstExpectedRecords, int secondExpectedRecords, int 
expectedOnlineSegments,
+      int expectedConsumingSegments)
+      throws Exception {
+
+    // Publish initial records and wait for them to be consumed
+    publishRecordsToKinesis(0, 50);
+    waitForRecordsToBeConsumed(getTableName(), 50); // pinot has created 2 
segments
+
+    // Perform shard operation (split or merge)
+    if ("split".equals(operation)) {
+      KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits 
shard 0 into shard 2 & 3
+      KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits 
shard 1 into shard 4 & 5
+    } else if ("merge".equals(operation)) {
+      KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 0, 1); // merges 
shard 0 & 1 into shard 2
+    }
+
+    // Publish more records after shard operation. These will go to the new 
shards
+    publishRecordsToKinesis(50, 200);
+
+    if (firstOffsetCriteria != null) {
+      // Pause and resume with the first offset criteria
+      pauseTable(getTableName()); // This will commit the current segments
+      resumeTable(getTableName(), firstOffsetCriteria);
+    } else {
+      runRealtimeSegmentValidationTask(getTableName());
+    }
+
+    waitForRecordsToBeConsumed(getTableName(), firstExpectedRecords); // Pinot 
will create new segments
+
+    if (secondOffsetCriteria != null) {
+      // Pause and resume with the second offset criteria
+      pauseTable(getTableName()); // This will commit the current segments
+      resumeTable(getTableName(), secondOffsetCriteria);
+    } else {
+      runRealtimeSegmentValidationTask(getTableName());
+    }
+
+    waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords); // 
Pinot will create new segments
+
+    // Publish more records after shard operation. These will go to the new 
shards
+    publishRecordsToKinesis(100, 200);
+    if (secondOffsetCriteria != null && 
secondOffsetCriteria.equals("largest")) {
+      // TODO - Fix this. Remove the check for largest offset. If largest 
offset is used,
+      //  we should have consumed the 100 records published after table was 
resumed.
+      //  Currently this is not happening. Thus the assertion is without the 
new records
+      //  We currently rely on RVM to fix the consumption
+      waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords);
+    } else {
+      waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords + 100);
+    }
+
+    runRealtimeSegmentValidationTask(getTableName());
+    waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords + 100);
+
+    // Validate the final state of segments
+    validateSegmentStates(getTableName(), expectedOnlineSegments, 
expectedConsumingSegments);
+  }
+
+  /**
+   * Data provider for new table tests with different offset combinations.
+   * Documentation is in the test method.
+   */
+  @DataProvider(name = "initialOffsetCombinations")
+  public Object[][] initialOffsetCombinations() {
+    return new Object[][]{
+        {"smallest", 50, 200},
+        {"largest", 50, 200}, // TODO - Validate if table created with largest 
offset should not consume old records
+        {"lastConsumed", 50, 200}
+    };
+  }
+
+  /**
+   * Test case to split shards, then create new table and check consumption
+   * For the sake of brevity, we will only test shard split and calling 
Realtime Validation Manager
+   * Individually, pause and resume have been verified for shard split / merge 
operations
+   */
+  @Test(dataProvider = "initialOffsetCombinations")
+  public void testNewTableAfterShardSplit(String offsetCriteria, int 
firstExpectedRecords, int secondExpectedRecords)
+      throws Exception {
+    // Publish initial records
+    publishRecordsToKinesis(0, 50);
+
+    // Split the shards
+    KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits 
shard 0 into shard 2 & 3
+    KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits 
shard 1 into shard 4 & 5
+
+    // new table is created with defined offset criteria but listening to the 
original stream
+    String name = getTableName() + "_" + offsetCriteria;
+    createNewSchemaAndTable(name, offsetCriteria);
+
+    waitForRecordsToBeConsumed(name, firstExpectedRecords);
+
+    // publish more records. These will go to the new shards
+    publishRecordsToKinesis(50, 200);
+    waitForRecordsToBeConsumed(name, firstExpectedRecords); // pinot doesn't 
listen to new shards yet.
+
+    // Trigger RVM. This will commit the current segments and start consuming 
from the new shards
+    runRealtimeSegmentValidationTask(name);
+    waitForRecordsToBeConsumed(name, secondExpectedRecords);
+
+    // Validate the final state of segments
+    validateSegmentStates(name, 2, 4);
+
+    dropNewSchemaAndTable(name);
+  }
+
+  /**
+   * Test case to first split shards, then merge some shards.
+   * For the sake of brevity, we will only test by calling Realtime Validation 
Manager
+   * Individually, pause and resume have been verified for shard split / merge 
operations
+   */
+  @Test
+  public void testSplitAndMergeShards()
+      throws Exception {
+    // Publish initial records
+    publishRecordsToKinesis(0, 50);
+    waitForRecordsToBeConsumed(getTableName(), 50); // pinot has created 2 
segments
+
+    // Split the shards
+    KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits 
shard 0 into shard 2 & 3
+    KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits 
shard 1 into shard 4 & 5
+
+    // Publish more records after shard operation. These will go to the new 
shards
+    publishRecordsToKinesis(50, 175);
+
+    // Merge some shards
+    KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 2, 3); // merges 
shard 2 & 3 into shard 6
+    KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 4, 5); // merges 
shard 4 & 5 into shard 7
+
+    // Publish more records after shard operation. These will go to the new 
shards
+    publishRecordsToKinesis(175, 200);
+
+    // Trigger RVM. This will commit segments 0 and 1 and start consuming from 
shards 2-5
+    runRealtimeSegmentValidationTask(getTableName());
+    waitForRecordsToBeConsumed(getTableName(), 175);
+
+    // Trigger RVM. This will commit segments 2-5 and start consuming from 
shards 6-7
+    runRealtimeSegmentValidationTask(getTableName());
+    waitForRecordsToBeConsumed(getTableName(), 200);
+
+    // Validate that 8 segments are created in total
+    validateSegmentStates(getTableName(), 6, 2);
+  }
+
+  /**
+   * Test case to continuously publish records to kinesis (in a background 
thread) and concurrently split shards
+   * and concurrently call pause and resume APIs or RVM and finally validate 
the total count of records
+   */
+  @Test
+  public void testConcurrentShardSplit()
+      throws IOException, InterruptedException {
+    // Start a background thread to continuously publish records to kinesis
+    Thread publisherThread = new Thread(() -> {
+      try {
+        for (int i = 0; i < 200; i += 5) {
+          publishRecordsToKinesis(i, i + 5);
+          Thread.sleep(1000);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error while publishing records to kinesis", e);
+      }
+    });
+    publisherThread.start(); // This will take ~40 secs to complete with 5 
records ingested per second
+
+    Thread.sleep(5000);
+
+    // Split the shards
+    KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits 
shard 0 into shard 2 & 3
+    KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits 
shard 1 into shard 4 & 5
+
+    Thread.sleep(5000);
+
+    // Trigger RVM. This will commit segments 0 and 1 and start consuming from 
shards 2-5
+    runRealtimeSegmentValidationTask(getTableName()); // This will commit 
segments 0-1 and start consuming from 2-5
+
+    // Merge some shards
+    KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 2, 3); // merges 
shard 2 & 3 into shard 6
+    KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 4, 5); // merges 
shard 4 & 5 into shard 7
+
+    Thread.sleep(5000);
+
+    // Call pause and resume APIs
+    pauseTable(getTableName()); // This will commit segments 2-5
+    resumeTable(getTableName(), "lastConsumed"); // start consuming from 
shards 6-7
+
+    // Wait for the publisher thread to finish
+    try {
+      publisherThread.join();
+    } catch (InterruptedException e) {
+      LOGGER.error("Error while waiting for publisher thread to finish", e);
+    }
+
+    waitForRecordsToBeConsumed(getTableName(), 200);
+
+    // Validate that all records are consumed
+    validateSegmentStates(getTableName(), 6, 2);
+  }
+
+  private void validateSegmentStates(String tableName, int 
expectedOnlineSegments, int expectedConsumingSegments)
+      throws IOException {
+    TableViews.TableView tableView = getExternalView(tableName, 
TableType.REALTIME);
+    Assert.assertEquals(tableView._realtime.size(), expectedOnlineSegments + 
expectedConsumingSegments);
+
+    List<String> onlineSegments = tableView._realtime.entrySet().stream()
+        .filter(x -> x.getValue().containsValue(ONLINE))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+    Assert.assertEquals(onlineSegments.size(), expectedOnlineSegments);
+
+    List<String> consumingSegments = tableView._realtime.entrySet().stream()
+        .filter(x -> x.getValue().containsValue(CONSUMING))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+    Assert.assertEquals(consumingSegments.size(), expectedConsumingSegments);
+  }
+
+  /**
+   * start and end offsets are essentially the start row index and end row 
index of the file
+   *
+   * @param startOffset - inclusive
+   * @param endOffset   - exclusive
+   */
+  private void publishRecordsToKinesis(int startOffset, int endOffset)
+      throws Exception {
+    InputStream inputStream = 
RealtimeKinesisIntegrationTest.class.getClassLoader()
+        .getResourceAsStream(KinesisShardChangeTest.DATA_FILE_PATH);
+    assert inputStream != null;
+    try (BufferedReader br = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+      String line;
+      int count = 0;
+      while ((line = br.readLine()) != null) {
+        // Skip the first startOffset lines
+        if (count < startOffset) {
+          count++;
+          continue;
+        }
+        if (count++ >= endOffset) {
+          break;
+        }
+        JsonNode data = JsonUtils.stringToJsonNode(line);
+        PutRecordResponse putRecordResponse = putRecord(line, 
data.get("Origin").textValue());
+        if (putRecordResponse.sdkHttpResponse().statusCode() != 200) {
+          throw new RuntimeException("Failed to put record " + line + " to 
Kinesis stream with status code: "
+              + putRecordResponse.sdkHttpResponse().statusCode());
+        }
+      }
+    }
+  }
+
+  private void waitForRecordsToBeConsumed(String tableName, int 
expectedNumRecords)
+      throws InterruptedException {
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        long count = getPinotConnection().execute("SELECT COUNT(*) FROM " + 
tableName).getResultSet(0).getLong(0);
+        if (count != expectedNumRecords) {
+          LOGGER.warn("Expected {} records, but got {} records. Retrying", 
expectedNumRecords, count);
+        }
+        return count == expectedNumRecords;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 2000, 60_000L, "Wait for all records to be ingested");
+    // Sleep for few secs and validate the count again (to ensure no more 
records are ingested)
+    Thread.sleep(2000);
+    long count = getPinotConnection().execute("SELECT COUNT(*) FROM " + 
tableName).getResultSet(0).getLong(0);
+    Assert.assertEquals(count, expectedNumRecords);
+  }
+
+  private void createNewSchemaAndTable(String name, String offsetCriteria)
+      throws IOException {
+    Schema schema = createSchema(SCHEMA_FILE_PATH);
+    schema.setSchemaName(name);
+    addSchema(schema);
+
+    TableConfigBuilder tableConfigBuilder = 
getTableConfigBuilder(TableType.REALTIME);
+    tableConfigBuilder.setTableName(name);
+    Map<String, String> streamConfigs = getStreamConfigs();
+    
streamConfigs.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+        StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), 
offsetCriteria);
+    tableConfigBuilder.setStreamConfigs(streamConfigs);
+    TableConfig tableConfig = tableConfigBuilder.build();
+    addTableConfig(tableConfig);
+  }
+
+  private void dropNewSchemaAndTable(String name)
+      throws IOException {
+    dropRealtimeTable(name);
+    deleteSchema(name);
+  }
+
+  @Override
+  public List<String> getNoDictionaryColumns() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public String getSortedColumn() {
+    return null;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java
similarity index 51%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java
index 1e33a0b7fe..1f5569a603 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java
@@ -16,29 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests;
-
-import cloud.localstack.Localstack;
-import cloud.localstack.ServiceName;
-import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor;
-import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
-import cloud.localstack.docker.annotation.LocalstackDockerProperties;
-import cloud.localstack.docker.command.Command;
+package org.apache.pinot.integration.tests.realtime.ingestion;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeType;
 import com.google.common.base.Function;
 import java.io.BufferedReader;
-import java.io.File;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -46,95 +36,45 @@ import java.util.List;
 import java.util.Map;
 import javax.activation.UnsupportedDataTypeException;
 import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.client.ResultSet;
-import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
-import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.StringUtil;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.SdkBytes;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
 import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.utils.AttributeMap;
 
 
-@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag = 
"2.3.2")
-public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSet {
+public class RealtimeKinesisIntegrationTest extends BaseKinesisIntegrationTest 
{
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeKinesisIntegrationTest.class);
 
-  private static final LocalstackDockerAnnotationProcessor PROCESSOR = new 
LocalstackDockerAnnotationProcessor();
-  private static final String STREAM_NAME = "kinesis-test";
-  private static final String STREAM_TYPE = "kinesis";
-
-  public static final String REGION = "us-east-1";
-  public static final String LOCALSTACK_KINESIS_ENDPOINT = 
"http://localhost:4566";;
-  public static final int NUM_SHARDS = 10;
+  private static final int NUM_SHARDS = 10;
 
   // Localstack Kinesis doesn't support large rows.
   // So, this airlineStats data file consists of only few fields and rows from 
the original data
-  public static final String SCHEMA_FILE_PATH = 
"kinesis/airlineStats_data_reduced.schema";
-  public static final String DATA_FILE_PATH = 
"kinesis/airlineStats_data_reduced.json";
-
-  private final Localstack _localstackDocker = Localstack.INSTANCE;
-
-  private static KinesisClient _kinesisClient = null;
+  private static final String SCHEMA_FILE_PATH = 
"kinesis/airlineStats_data_reduced.schema";
+  private static final String DATA_FILE_PATH = 
"kinesis/airlineStats_data_reduced.json";
 
   private long _totalRecordsPushedInStream = 0;
 
   List<String> _h2FieldNameAndTypes = new ArrayList<>();
 
-  private boolean _skipTestNoDockerInstalled = false;
-
   @BeforeClass
   public void setUp()
       throws Exception {
-    try {
-      DockerInfoCommand dockerInfoCommand = new DockerInfoCommand();
-      dockerInfoCommand.execute();
-    } catch (IllegalStateException e) {
-      _skipTestNoDockerInstalled = true;
-      LOGGER.warn("Skipping test! Docker is not found running", e);
-      throw new SkipException(e.getMessage());
-    }
-
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    // Start the Pinot cluster
-    startZk();
-    startController();
-    startBroker();
-    startServer();
+    super.setUp();
 
-    // Start Kinesis
-    startKinesis();
+    // Create new stream
+    createStream(NUM_SHARDS);
 
     // Create and upload the schema and table config
-    addSchema(createKinesisSchema());
-    addTableConfig(createKinesisTableConfig());
+    addSchema(createSchema(SCHEMA_FILE_PATH));
+    addTableConfig(createRealtimeTableConfig(null));
 
     createH2ConnectionAndTable();
 
@@ -145,13 +85,6 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
     waitForAllDocsLoadedKinesis(120_000L);
   }
 
-  public Schema createKinesisSchema()
-      throws Exception {
-    URL resourceUrl = 
BaseClusterIntegrationTest.class.getClassLoader().getResource(SCHEMA_FILE_PATH);
-    Assert.assertNotNull(resourceUrl);
-    return Schema.fromFile(new File(resourceUrl.getFile()));
-  }
-
   protected void waitForAllDocsLoadedKinesis(long timeoutMs)
       throws Exception {
     waitForAllDocsLoadedKinesis(timeoutMs, true);
@@ -172,79 +105,14 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
     }, 1000L, timeoutMs, "Failed to load " + _totalRecordsPushedInStream + " 
documents", raiseError);
   }
 
-  public TableConfig createKinesisTableConfig() {
-    return new 
TableConfigBuilder(TableType.REALTIME).setTableName(getTableName())
-        
.setTimeColumnName("DaysSinceEpoch").setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
-        
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
-        
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-        
.setStreamConfigs(createKinesisStreamConfig()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
-  }
-
-  public Map<String, String> createKinesisStreamConfig() {
-    Map<String, String> streamConfigMap = new HashMap<>();
-    String streamType = "kinesis";
-    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-
-    streamConfigMap.put(
-        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_TOPIC_NAME),
-        STREAM_NAME);
-
-    streamConfigMap.put(
-        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
-        "30000");
-    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
-        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), 
KinesisConsumerFactory.class.getName());
-    streamConfigMap.put(
-        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_DECODER_CLASS),
-        "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
-    streamConfigMap.put(KinesisConfig.REGION, REGION);
-    streamConfigMap.put(KinesisConfig.SHARD_ITERATOR_TYPE, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString());
-    streamConfigMap.put(KinesisConfig.ENDPOINT, LOCALSTACK_KINESIS_ENDPOINT);
-    streamConfigMap.put(KinesisConfig.ACCESS_KEY, 
getLocalAWSCredentials().resolveCredentials().accessKeyId());
-    streamConfigMap.put(KinesisConfig.SECRET_KEY, 
getLocalAWSCredentials().resolveCredentials().secretAccessKey());
-    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
Integer.toString(200));
-    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
-        StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
-    return streamConfigMap;
-  }
-
-  public void startKinesis()
-      throws Exception {
-    final LocalstackDockerConfiguration dockerConfig = 
PROCESSOR.process(this.getClass());
-    StopAllLocalstackDockerCommand stopAllLocalstackDockerCommand = new 
StopAllLocalstackDockerCommand();
-    stopAllLocalstackDockerCommand.execute();
-    _localstackDocker.startup(dockerConfig);
-
-    _kinesisClient = KinesisClient.builder().httpClient(new 
ApacheSdkHttpService().createHttpClientBuilder()
-            .buildWithDefaults(
-                
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, 
Boolean.TRUE).build()))
-        
.credentialsProvider(getLocalAWSCredentials()).region(Region.of(REGION))
-        .endpointOverride(new URI(LOCALSTACK_KINESIS_ENDPOINT)).build();
-
-    
_kinesisClient.createStream(CreateStreamRequest.builder().streamName(STREAM_NAME).shardCount(NUM_SHARDS).build());
-
-    TestUtils.waitForCondition(new Function<Void, Boolean>() {
-      @Nullable
-      @Override
-      public Boolean apply(@Nullable Void aVoid) {
-        try {
-          String kinesisStreamStatus =
-              
_kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(STREAM_NAME).build())
-                  .streamDescription().streamStatusAsString();
-
-          return kinesisStreamStatus.contentEquals("ACTIVE");
-        } catch (Exception e) {
-          LOGGER.warn("Could not fetch kinesis stream status", e);
-          return null;
-        }
-      }
-    }, 1000L, 30000, "Kinesis stream " + STREAM_NAME + " is not created or is 
not in active state", true);
+  @Override
+  public List<String> getNoDictionaryColumns() {
+    return Collections.emptyList();
   }
 
-  public void stopKinesis() {
-    if (_localstackDocker.isRunning()) {
-      _localstackDocker.stop();
-    }
+  @Override
+  public String getSortedColumn() {
+    return null;
   }
 
   private void publishRecordsToKinesis() {
@@ -264,10 +132,7 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
         while ((line = br.readLine()) != null) {
           JsonNode data = JsonUtils.stringToJsonNode(line);
 
-          PutRecordRequest putRecordRequest =
-              
PutRecordRequest.builder().streamName(STREAM_NAME).data(SdkBytes.fromUtf8String(line))
-                  .partitionKey(data.get("Origin").textValue()).build();
-          PutRecordResponse putRecordResponse = 
_kinesisClient.putRecord(putRecordRequest);
+          PutRecordResponse putRecordResponse = putRecord(line, 
data.get("Origin").textValue());
           if (putRecordResponse.sdkHttpResponse().statusCode() == 200) {
             if (StringUtils.isNotBlank(putRecordResponse.sequenceNumber()) && 
StringUtils.isNotBlank(
                 putRecordResponse.shardId())) {
@@ -303,10 +168,6 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
     }
   }
 
-  private static AwsCredentialsProvider getLocalAWSCredentials() {
-    return 
StaticCredentialsProvider.create(AwsBasicCredentials.create("access", 
"secret"));
-  }
-
   @Test
   public void testRecords()
       throws Exception {
@@ -435,42 +296,7 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
   @AfterClass
   public void tearDown()
       throws Exception {
-    if (_skipTestNoDockerInstalled) {
-      return;
-    }
-
     dropRealtimeTable(getTableName());
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-    stopKinesis();
-    FileUtils.deleteDirectory(_tempDir);
-  }
-
-  public static class StopAllLocalstackDockerCommand extends Command {
-
-    public void execute() {
-      String runningDockerContainers =
-          dockerExe.execute(Arrays.asList("ps", "-a", "-q", "-f", 
"ancestor=localstack/localstack"));
-      if (StringUtils.isNotBlank(runningDockerContainers) && 
!runningDockerContainers.toLowerCase().contains("error")) {
-        String[] containerList = runningDockerContainers.split("\n");
-
-        for (String containerId : containerList) {
-          dockerExe.execute(Arrays.asList("stop", containerId));
-        }
-      }
-    }
-  }
-
-  public static class DockerInfoCommand extends Command {
-
-    public void execute() {
-      String dockerInfo = dockerExe.execute(Collections.singletonList("info"));
-
-      if (dockerInfo.toLowerCase().contains("error")) {
-        throw new IllegalStateException("Docker daemon is not running!");
-      }
-    }
+    super.tearDown();
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/utils/KinesisUtils.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/utils/KinesisUtils.java
new file mode 100644
index 0000000000..8f72d6f81e
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/utils/KinesisUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.realtime.ingestion.utils;
+
+import java.math.BigInteger;
+import java.time.Duration;
+import java.util.List;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.MergeShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.SplitShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SplitShardResponse;
+
+
+public class KinesisUtils {
+
+  private KinesisUtils() {
+  }
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KinesisUtils.class);
+
+  public static void splitNthShard(KinesisClient kinesisClient, String stream, 
int index) {
+    List<Shard> shards = getShards(kinesisClient, stream);
+    int initialSize = shards.size();
+    splitShard(kinesisClient, stream, shards.get(index));
+    LOGGER.info("Splitted shard with ID: " + shards.get(index).shardId());
+
+    TestUtils.waitForCondition((avoid) -> isKinesisStreamActive(kinesisClient, 
stream)
+            && getShards(kinesisClient, stream).size() == initialSize + 2,
+        2000, Duration.ofMinutes(1).toMillis(), "Waiting for Kinesis stream to 
be active and shards to be split");
+  }
+
+  public static void mergeShards(KinesisClient kinesisClient, String stream, 
int index1, int index2) {
+    List<Shard> shards = getShards(kinesisClient, stream);
+    int initialSize = shards.size();
+    mergeShard(kinesisClient, stream, shards.get(index1), shards.get(index2));
+    LOGGER.info("Merged shard with ID: " + shards.get(index1).shardId() + " 
and " + shards.get(index2).shardId());
+
+    TestUtils.waitForCondition((avoid) -> isKinesisStreamActive(kinesisClient, 
stream)
+            && getShards(kinesisClient, stream).size() == initialSize + 1,
+        2000, Duration.ofMinutes(1).toMillis(), "Waiting for Kinesis stream to 
be active and shards to be merged");
+  }
+
+  public static boolean isKinesisStreamActive(KinesisClient kinesisClient, 
String streamName) {
+    try {
+      String kinesisStreamStatus = getKinesisStreamStatus(kinesisClient, 
streamName);
+      boolean isActive = kinesisStreamStatus.contentEquals("ACTIVE");
+      if (!isActive) {
+        LOGGER.warn("Kinesis stream " + streamName + " in state" + 
kinesisStreamStatus);
+      }
+      return isActive;
+    } catch (ResourceNotFoundException e) {
+      LOGGER.warn("Kinesis stream " + streamName + " not found");
+      return false;
+    }
+  }
+
+  public static String getKinesisStreamStatus(KinesisClient kinesisClient, 
String streamName) {
+    return 
kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(streamName).build())
+        .streamDescription().streamStatusAsString();
+  }
+
+  private static List<Shard> getShards(KinesisClient kinesisClient, String 
stream) {
+    ListShardsResponse listShardsResponse =
+        
kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build());
+    return listShardsResponse.shards();
+  }
+
+  private static SplitShardResponse splitShard(KinesisClient kinesisClient, 
String stream, Shard shard) {
+    BigInteger startHash = new 
BigInteger(shard.hashKeyRange().startingHashKey());
+    BigInteger endHash = new BigInteger(shard.hashKeyRange().endingHashKey());
+    BigInteger newStartingHashKey = startHash.add(endHash).divide(new 
BigInteger("2"));
+    return kinesisClient.splitShard(SplitShardRequest.builder()
+        .shardToSplit(shard.shardId())
+        .streamName(stream)
+        .newStartingHashKey(newStartingHashKey.toString())
+        .build());
+  }
+
+  private static MergeShardsResponse mergeShard(KinesisClient kinesisClient, 
String stream, Shard shard1,
+      Shard shard2) {
+    return kinesisClient.mergeShards(MergeShardsRequest.builder()
+        .shardToMerge(shard1.shardId())
+        .adjacentShardToMerge(shard2.shardId())
+        .streamName(stream)
+        .build());
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index de876b3071..f5a905e111 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -61,6 +61,16 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
     super(config, kinesisClient);
   }
 
+  /**
+   * Based on Kinesis documentation, we might get a response with empty 
records but a non-null nextShardIterator.
+   * Known cases are:
+   *  1. When the shard has ended (has been split or merged) and we need a 
couple of calls to getRecords() to reach
+   *  a null iterator
+   *  2. When there are no new messages in the shard but the shard is active. 
We will continue to get a non-null
+   *  nextShardIterator in this case
+   *  3. When there are some messages in the shard, but we need a few 
iterations to get them.
+   * This needs to be handled by the client based on appropriate retry 
strategy.
+   */
   @Override
   public synchronized KinesisMessageBatch 
fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
     try {
@@ -98,6 +108,7 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
     GetRecordsRequest getRecordRequest =
         
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
     GetRecordsResponse getRecordsResponse = 
_kinesisClient.getRecords(getRecordRequest);
+
     List<Record> records = getRecordsResponse.records();
     List<BytesStreamMessage> messages;
     KinesisPartitionGroupOffset offsetOfNextBatch;
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index 612ea38098..d9b5f17e39 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -197,6 +197,23 @@ public class KinesisStreamMetadataProvider implements 
StreamMetadataProvider {
     return newPartitionGroupMetadataList;
   }
 
+  /**
+   * Refer documentation for {@link #computePartitionGroupMetadata(String, 
StreamConfig, List, int)}
+   * @param forceGetOffsetFromStream - the flag is not required for Kinesis 
stream. Kinesis implementation
+   *                                 takes care of returning non-null offsets 
for all old and new partitions.
+   *                                 The flag is primarily required for Kafka 
stream which requires refactoring
+   *                                 to avoid this flag. More details in {@link
+   *                                 
StreamMetadataProvider#computePartitionGroupMetadata(
+   *                                 String, StreamConfig, List, int, boolean)}
+   */
+  @Override
+  public List<PartitionGroupMetadata> computePartitionGroupMetadata(String 
clientId, StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses, 
int timeoutMillis,
+      boolean forceGetOffsetFromStream)
+      throws IOException, TimeoutException {
+    return computePartitionGroupMetadata(clientId, streamConfig, 
partitionGroupConsumptionStatuses, timeoutMillis);
+  }
+
   /**
    * Converts a shardId string to a partitionGroupId integer by parsing the 
digits of the shardId
    * e.g. "shardId-000000000001" becomes 1
@@ -213,8 +230,29 @@ public class KinesisStreamMetadataProvider implements 
StreamMetadataProvider {
       throws IOException, TimeoutException {
     try (PartitionGroupConsumer partitionGroupConsumer = 
_kinesisStreamConsumerFactory.createPartitionGroupConsumer(
         _clientId, partitionGroupConsumptionStatus)) {
-      MessageBatch<?> messageBatch = 
partitionGroupConsumer.fetchMessages(startCheckpoint, _fetchTimeoutMs);
-      return messageBatch.getMessageCount() == 0 && 
messageBatch.isEndOfPartitionGroup();
+      int attempts = 0;
+      while (true) {
+        MessageBatch<?> messageBatch = 
partitionGroupConsumer.fetchMessages(startCheckpoint, _fetchTimeoutMs);
+        if (messageBatch.getMessageCount() > 0) {
+          // There are messages left to be consumed so we haven't consumed the 
shard fully
+          return false;
+        }
+        if (messageBatch.isEndOfPartitionGroup()) {
+          // Shard can't be iterated further. We have consumed all the 
messages because message count = 0
+          return true;
+        }
+        // Even though message count = 0, shard can be iterated further.
+        // Based on kinesis documentation, there might be more records to be 
consumed.
+        // So we need to fetch messages again to check if we have reached end 
of shard.
+        // To prevent an infinite loop (known cases listed in 
fetchMessages()), we will limit the number of attempts
+        attempts++;
+        if (attempts >= 5) {
+          LOGGER.warn("Reached max attempts to check if end of shard reached 
from checkpoint {}. "
+                  + " Assuming we have not consumed till end of shard.", 
startCheckpoint);
+          return false;
+        }
+        // continue to fetch messages. reusing the partitionGroupConsumer 
ensures we use new shard iterator
+      }
     }
   }
 
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
index c6cf493370..7ad46919f2 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
@@ -179,6 +179,38 @@ public class KinesisStreamMetadataProviderTest {
     Assert.assertEquals(result.size(), 1);
     Assert.assertEquals(result.get(0).getPartitionGroupId(), 1);
     
Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(),
 1);
+
+    // Simulate the case where initial calls to fetchMessages returns empty 
messages but non-null next shard iterator
+    when(_partitionGroupConsumer.fetchMessages(checkpointArgs.capture(), 
intArguments.capture()))
+        .thenReturn(new KinesisMessageBatch(new ArrayList<>(), 
kinesisPartitionGroupOffset, false))
+        .thenReturn(new KinesisMessageBatch(new ArrayList<>(), 
kinesisPartitionGroupOffset, false))
+        .thenReturn(new KinesisMessageBatch(new ArrayList<>(), 
kinesisPartitionGroupOffset, true));
+    result =
+        
_kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, 
getStreamConfig(),
+            currentPartitionGroupMeta, TIMEOUT);
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0).getPartitionGroupId(), 1);
+    
Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(),
 1);
+
+    // Simulate the case where all calls to fetchMessages returns empty 
messages and non-null next shard iterator
+    when(_partitionGroupConsumer.fetchMessages(checkpointArgs.capture(), 
intArguments.capture()))
+        .thenReturn(new KinesisMessageBatch(new ArrayList<>(), 
kinesisPartitionGroupOffset, false))
+        .thenReturn(new KinesisMessageBatch(new ArrayList<>(), 
kinesisPartitionGroupOffset, false))
+        .thenReturn(new KinesisMessageBatch(new ArrayList<>(), 
kinesisPartitionGroupOffset, false))
+        .thenReturn(new KinesisMessageBatch(new ArrayList<>(), 
kinesisPartitionGroupOffset, false))
+        .thenReturn(new KinesisMessageBatch(new ArrayList<>(), 
kinesisPartitionGroupOffset, false))
+        .thenReturn(new KinesisMessageBatch(new ArrayList<>(), 
kinesisPartitionGroupOffset, false))
+        .thenReturn(new KinesisMessageBatch(new ArrayList<>(), 
kinesisPartitionGroupOffset, false));
+
+    result =
+        
_kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, 
getStreamConfig(),
+            currentPartitionGroupMeta, TIMEOUT);
+
+    Assert.assertEquals(result.size(), 2);
+    Assert.assertEquals(result.get(0).getPartitionGroupId(), 0);
+    
Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(),
 1);
+    Assert.assertEquals(result.get(1).getPartitionGroupId(), 1);
+    
Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(),
 1);
   }
 
   @Test
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 77eac3832e..01f429a511 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -47,16 +47,6 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
     _idleTimeoutMs = idleTimeoutMs;
   }
 
-  private boolean isOffsetCaughtUp(StreamPartitionMsgOffset currentOffset, 
StreamPartitionMsgOffset latestOffset) {
-    if (currentOffset != null && latestOffset != null) {
-      // Kafka's "latest" offset is actually the next available offset. 
Therefore it will be 1 ahead of the
-      // current offset in the case we are caught up.
-      // TODO: implement a way to have this work correctly for kafka consumers
-      return currentOffset.compareTo(latestOffset) >= 0;
-    }
-    return false;
-  }
-
   private boolean segmentHasBeenIdleLongerThanThreshold(long segmentIdleTime) {
     return _idleTimeoutMs > 0 && segmentIdleTime > _idleTimeoutMs;
   }
@@ -84,7 +74,7 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
     // the stream consumer to check partition count if we're already caught up.
     StreamPartitionMsgOffset currentOffset = 
rtSegmentDataManager.getCurrentOffset();
     StreamPartitionMsgOffset latestStreamOffset = 
rtSegmentDataManager.fetchLatestStreamOffset(5000);
-    if (isOffsetCaughtUp(currentOffset, latestStreamOffset)) {
+    if (isOffsetCaughtUp(segmentName, currentOffset, latestStreamOffset)) {
       _logger.info("Segment {} with freshness {}ms has not caught up within 
min freshness {}. "
               + "But the current ingested offset is equal to the latest 
available offset {}.", segmentName, freshnessMs,
           _minFreshnessMs, currentOffset);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index c6fe0d16d6..18d08dd3d5 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -29,6 +29,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -135,4 +136,24 @@ public abstract class 
IngestionBasedConsumptionStatusChecker {
   }
 
   protected abstract boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager);
+
+  protected boolean isOffsetCaughtUp(String segmentName,
+      StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset 
latestOffset) {
+    if (currentOffset != null && latestOffset != null) {
+      // Kafka's "latest" offset is actually the next available offset. 
Therefore it will be 1 ahead of the
+      // current offset in the case we are caught up.
+      // TODO: implement a way to have this work correctly for kafka consumers
+      _logger.info("Null offset found for segment {} - current offset: {}, 
latest offset: {}. "
+          + "Will check consumption status later", segmentName, currentOffset, 
latestOffset);
+      try {
+        return currentOffset.compareTo(latestOffset) >= 0;
+      } catch (NullPointerException e) {
+        // This can happen if the offsets are not comparable,
+        // Eg: Sequence number missing for a kinesis shard
+        _logger.info("Unable to compare offsets for segment {} - current 
offset: {}, latest offset: {}. "
+            + "Will check consumption status later", segmentName, 
currentOffset, latestOffset);
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
index ad7d2905ba..b4f2ba12e2 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
@@ -45,18 +45,15 @@ public class OffsetBasedConsumptionStatusChecker extends 
IngestionBasedConsumpti
   protected boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager) {
     StreamPartitionMsgOffset latestIngestedOffset = 
rtSegmentDataManager.getCurrentOffset();
     StreamPartitionMsgOffset latestStreamOffset = 
rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
-    if (latestStreamOffset == null || latestIngestedOffset == null) {
-      _logger.info("Null offset found for segment {} - latest stream offset: 
{}, latest ingested offset: {}. "
-          + "Will check consumption status later", segmentName, 
latestStreamOffset, latestIngestedOffset);
-      return false;
-    }
-    if (latestIngestedOffset.compareTo(latestStreamOffset) < 0) {
-      _logger.info("Latest ingested offset {} in segment {} is smaller than 
stream latest available offset {} ",
-          latestIngestedOffset, segmentName, latestStreamOffset);
-      return false;
+
+    if (isOffsetCaughtUp(segmentName, latestIngestedOffset, 
latestStreamOffset)) {
+      _logger.info("Segment {} with latest ingested offset {} has caught up to 
the latest stream offset {}",
+          segmentName, latestIngestedOffset, latestStreamOffset);
+      return true;
     }
-    _logger.info("Segment {} with latest ingested offset {} has caught up to 
the latest stream offset {}", segmentName,
-        latestIngestedOffset, latestStreamOffset);
-    return true;
+
+    _logger.info("Latest ingested offset {} in segment {} is smaller than 
stream latest available offset {} ",
+        latestIngestedOffset, segmentName, latestStreamOffset);
+    return false;
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 30cbe8bd63..53f0e33ed1 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -40,13 +40,16 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
   private final List<PartitionGroupConsumptionStatus> 
_partitionGroupConsumptionStatusList;
   private Exception _exception;
   private final List<String> _topicNames;
+  private final boolean _forceGetOffsetFromStream;
 
   public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs,
-      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList,
+      boolean forceGetOffsetFromStream) {
     _topicNames = 
streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList());
     _streamConfigs = streamConfigs;
     _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
     _newPartitionGroupMetadataList = new ArrayList<>();
+    _forceGetOffsetFromStream = forceGetOffsetFromStream;
   }
 
   public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
@@ -83,8 +86,8 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
         _newPartitionGroupMetadataList.addAll(
             
streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId),
                 _streamConfigs.get(i),
-                topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000).stream().map(
-                metadata -> new PartitionGroupMetadata(
+                topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000, _forceGetOffsetFromStream).stream()
+                .map(metadata -> new PartitionGroupMetadata(
                     
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(
                         metadata.getPartitionGroupId(), index),
                     metadata.getStartOffset())).collect(Collectors.toList())
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 64770d3f83..66bf9768b5 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.stream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -98,6 +99,26 @@ public interface StreamMetadataProvider extends Closeable {
     return newPartitionGroupMetadataList;
   }
 
+  /**
+   * @param forceGetOffsetFromStream - the flag is a workaround to not use 
partitionGroupConsumptionStatuses.
+   *                                  This is required because 
PinotLLCRealtimeSegmentManager.selectStartOffset()
+   *                                  actually requires the offsets from the 
stream, but was originally relying on
+   *                                  passing an empty 
partitionGroupConsumptionStatuses to the method.
+   *                                  The change for <a 
href="https://github.com/apache/pinot/issues/15608";>...</a>
+   *                                  required to pass the actual 
partitionGroupConsumptionStatuses
+   *                                  TODO - Remove the flag and fix the 
clients calling computePartitionGroupMetadata()
+   */
+  default List<PartitionGroupMetadata> computePartitionGroupMetadata(String 
clientId, StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses, 
int timeoutMillis,
+      boolean forceGetOffsetFromStream)
+      throws IOException, TimeoutException {
+    if (forceGetOffsetFromStream) {
+      return computePartitionGroupMetadata(clientId, streamConfig, 
Collections.emptyList(), timeoutMillis);
+    } else {
+      return computePartitionGroupMetadata(clientId, streamConfig, 
partitionGroupConsumptionStatuses, timeoutMillis);
+    }
+  }
+
   default Map<String, PartitionLagState> getCurrentPartitionLagState(
       Map<String, ConsumerPartitionState> currentPartitionStateMap) {
     Map<String, PartitionLagState> result = new HashMap<>();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 48a3e63f75..2804eac53e 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -113,6 +113,11 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "periodictask", "run?taskname=" + 
taskName);
   }
 
+  public String forPeriodTaskRun(String taskName, String tableName, TableType 
tableType) {
+    return StringUtil.join("/", _baseUrl, "periodictask", "run?taskname=" + 
taskName + "&tableName=" + tableName
+        + "&type=" + tableType);
+  }
+
   public String forUpdateUserConfig(String username, String componentTypeStr, 
boolean passwordChanged) {
     StringBuilder params = new StringBuilder();
     if (StringUtils.isNotBlank(username)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to