This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 056c930  Changed the segment commit protocol to send/receive 
streamPartitionMs… (#5486)
056c930 is described below

commit 056c930c6e62bbcd4cd3e8c659e16e815532dc4c
Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
AuthorDate: Wed Jun 3 13:01:24 2020 -0700

    Changed the segment commit protocol to send/receive streamPartitionMs… 
(#5486)
    
    * Changed the segment commit protocol to send/receive 
streamPartitionMsgOffset
    
    Updated the segment commit protocol so that new element 
streamPartitionMsgOffset
    is populated in requests (as request parameters) and in response (as JSON 
string element)
    
    The server side has been modified to send the 'streamPartitionMsgOffset' as 
well as we the
    'offset' parameters to the controller. The controller looks for and prefers 
streamPartitionMsgOffset
    but falls back to offset if the streamPartitionMsgOffset is not there.
    
    The controller side, in the repsonse message, populates both of the 
elements, and the server on
    the receiver side does likewise -- preferring streamPartitionMsgOffset.
    
    All callers into the protocol module have been modified to NOT set the 
offset field. Instead,
    only set the streamPartitionMsgOffset field. The 'offset' value will be 
derived from
    streamPartitionMsgOffset.
    
    Added a test to make sure that the controller always generates both 
elements. Such a test was not
    possible in the server side at this time, so verified manually.
    
    Manually ran LLCClusterIntergrationTest by disabling populating 
`streamPartitionMsgOffset` on the
    server side (old server/new controller) and on the controller respons side 
(new server/old controller)
    
    Issue #5359
    
    * Addressed review comments
---
 .../protocols/SegmentCompletionProtocol.java       |  76 +++--
 .../resources/LLCSegmentCompletionHandlers.java    |  93 ++++--
 .../api/SegmentCompletionProtocolDeserTest.java    |   4 +-
 .../helix/core/realtime/SegmentCompletionTest.java | 311 +++++++++++----------
 .../realtime/LLRealtimeSegmentDataManager.java     |   2 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  70 ++---
 .../tests/SegmentCompletionIntegrationTest.java    |   5 +-
 .../pinot/spi/stream/StreamPartitionMsgOffset.java |   9 +
 8 files changed, 340 insertions(+), 230 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 3150504..8af8dbf 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -122,6 +122,7 @@ public class SegmentCompletionProtocol {
   public static final String PARAM_SEGMENT_LOCATION = "location";
   public static final String PARAM_SEGMENT_NAME = "name";
   public static final String PARAM_OFFSET = "offset";
+  public static final String PARAM_STREAM_PARTITION_MSG_OFFSET = 
"streamPartitionMsgOffset";
   public static final String PARAM_INSTANCE_ID = "instance";
   public static final String PARAM_MEMORY_USED_BYTES = "memoryUsedBytes";
   public static final String PARAM_SEGMENT_SIZE_BYTES = "segmentSizeBytes";
@@ -189,7 +190,10 @@ public class SegmentCompletionProtocol {
           + (_params.getSegmentSizeBytes() <= 0 ? ""
           : ("&" + PARAM_SEGMENT_SIZE_BYTES + "=" + 
_params.getSegmentSizeBytes())) + (_params.getNumRows() <= 0 ? ""
           : ("&" + PARAM_ROW_COUNT + "=" + _params.getNumRows())) + 
(_params.getSegmentLocation() == null ? ""
-          : ("&" + PARAM_SEGMENT_LOCATION + "=" + 
_params.getSegmentLocation()));
+          : ("&" + PARAM_SEGMENT_LOCATION + "=" + 
_params.getSegmentLocation()))
+          + (_params.getStreamPartitionMsgOffset() == null ? ""
+          : ("&" + PARAM_STREAM_PARTITION_MSG_OFFSET + "=" + 
_params.getStreamPartitionMsgOffset()))
+          ;
     }
 
     public static class Params {
@@ -204,6 +208,7 @@ public class SegmentCompletionProtocol {
       private String _segmentLocation;
       private long _memoryUsedBytes;
       private long _segmentSizeBytes;
+      private String _streamPartitionMsgOffset;
 
       public Params() {
         _offset = -1L;
@@ -216,6 +221,7 @@ public class SegmentCompletionProtocol {
         _segmentLocation = null;
         _memoryUsedBytes = MEMORY_USED_BYTES_DEFAULT;
         _segmentSizeBytes = SEGMENT_SIZE_BYTES_DEFAULT;
+        _streamPartitionMsgOffset = null;
       }
 
       public Params(Params params) {
@@ -229,8 +235,10 @@ public class SegmentCompletionProtocol {
         _segmentLocation = params.getSegmentLocation();
         _memoryUsedBytes = params.getMemoryUsedBytes();
         _segmentSizeBytes = params.getSegmentSizeBytes();
+        _streamPartitionMsgOffset = 
params.getStreamPartitionMsgOffset().toString();
       }
 
+      @Deprecated
       public Params withOffset(long offset) {
         _offset = offset;
         return this;
@@ -287,7 +295,14 @@ public class SegmentCompletionProtocol {
       }
 
       public Params withStreamPartitionMsgOffset(StreamPartitionMsgOffset 
offset) {
-        _offset = offset.getOffset();
+        _streamPartitionMsgOffset = offset.toString();
+        // Try to populate the offset if possible.
+        // TODO Remove this code once we have both sides be able to live 
without _offset.
+        try {
+          _offset = Long.parseLong(_streamPartitionMsgOffset);
+        } catch (Exception e) {
+          // Ignore, if the recipient excepts _offset, it will return an error 
to the sender.
+        }
         return this;
       }
 
@@ -295,7 +310,8 @@ public class SegmentCompletionProtocol {
         return _segmentName;
       }
 
-      public long getOffset() {
+      @Deprecated
+      private long getOffset() {
         return _offset;
       }
 
@@ -336,14 +352,19 @@ public class SegmentCompletionProtocol {
       }
 
       public StreamPartitionMsgOffset getStreamPartitionMsgOffset() {
-        return new StreamPartitionMsgOffset(_offset);
+        if (_streamPartitionMsgOffset != null) {
+          return new StreamPartitionMsgOffset(_streamPartitionMsgOffset);
+        } else {
+          return new StreamPartitionMsgOffset(_offset);
+        }
       }
 
       public String toString() {
         return "Offset: " + _offset + ",Segment name: " + _segmentName + 
",Instance Id: " + _instanceId + ",Reason: "
             + _reason + ",NumRows: " + _numRows + ",BuildTimeMillis: " + 
_buildTimeMillis + ",WaitTimeMillis: "
             + _waitTimeMillis + ",ExtraTimeSec: " + _extraTimeSec + 
",SegmentLocation: " + _segmentLocation
-            + ",MemoryUsedBytes: " + _memoryUsedBytes + ",SegmentSizeBytes: " 
+ _segmentSizeBytes;
+            + ",MemoryUsedBytes: " + _memoryUsedBytes + ",SegmentSizeBytes: " 
+ _segmentSizeBytes
+            + ",StreamPartitionMsgOffset: " + _streamPartitionMsgOffset;
       }
     }
   }
@@ -404,6 +425,7 @@ public class SegmentCompletionProtocol {
     private boolean _splitCommit;
     private String _segmentLocation;
     private String _controllerVipUrl;
+    private String _streamPartitionMsgOffset;
 
     public Response() {
     }
@@ -415,6 +437,7 @@ public class SegmentCompletionProtocol {
       _splitCommit = params.isSplitCommit();
       _segmentLocation = params.getSegmentLocation();
       _controllerVipUrl = params.getControllerVipUrl();
+      _streamPartitionMsgOffset = params.getStreamPartitionMsgOffset();
     }
 
     @JsonProperty(STATUS_KEY)
@@ -427,17 +450,26 @@ public class SegmentCompletionProtocol {
       _status = status;
     }
 
+    @Deprecated
     @JsonProperty(OFFSET_KEY)
     public long getOffset() {
       return _offset;
     }
 
     // TODO Make it a JsonProperty when we are ready to move the protocol
+    // This method is called in the server when the controller responds with
+    // CATCH_UP response to segmentConsumed() API.
     @JsonIgnore
-    public StreamPartitionMsgOffset getStreamPartitionMsgOffset() {
-      return new StreamPartitionMsgOffset(_offset);
+    public String getStreamPartitionMsgOffset() {
+      return _streamPartitionMsgOffset;
     }
 
+    @JsonIgnore
+    public void setStreamPartitionMsgOffset(String streamPartitionMsgOffset) {
+      _streamPartitionMsgOffset = streamPartitionMsgOffset;
+    }
+
+    @Deprecated
     @JsonProperty(OFFSET_KEY)
     public void setOffset(long offset) {
       _offset = offset;
@@ -485,6 +517,14 @@ public class SegmentCompletionProtocol {
       _segmentLocation = segmentLocation;
     }
 
+    public StreamPartitionMsgOffset extractOffset() {
+      if (_streamPartitionMsgOffset != null) {
+        return new StreamPartitionMsgOffset(getStreamPartitionMsgOffset());
+      } else {
+        return new StreamPartitionMsgOffset(getOffset());
+      }
+    }
+
     public String toJsonString() {
       try {
         return JsonUtils.objectToString(this);
@@ -508,6 +548,7 @@ public class SegmentCompletionProtocol {
       private boolean _splitCommit;
       private String _segmentLocation;
       private String _controllerVipUrl;
+      private String _streamPartitionMsgOffset;
 
       public Params() {
         _offset = -1L;
@@ -516,11 +557,7 @@ public class SegmentCompletionProtocol {
         _splitCommit = false;
         _segmentLocation = null;
         _controllerVipUrl = null;
-      }
-
-      public Params withOffset(long offset) {
-        _offset = offset;
-        return this;
+        _streamPartitionMsgOffset = null;
       }
 
       public Params withStatus(ControllerResponseStatus status) {
@@ -549,7 +586,13 @@ public class SegmentCompletionProtocol {
       }
 
       public Params withStreamPartitionMsgOffset(StreamPartitionMsgOffset 
offset) {
-        _offset = offset.getOffset();
+        _streamPartitionMsgOffset = offset.toString();
+        // TODO Remove the block below once we have both parties be fine 
without _offset being present.
+        try {
+          _offset = Long.parseLong(_streamPartitionMsgOffset);
+        } catch (Exception e) {
+          // Ignore. If the received expects _offset, it will return an error 
to the sender.
+        }
         return this;
       }
 
@@ -557,7 +600,8 @@ public class SegmentCompletionProtocol {
         return _status;
       }
 
-      public long getOffset() {
+      @Deprecated
+      private long getOffset() {
         return _offset;
       }
 
@@ -577,8 +621,8 @@ public class SegmentCompletionProtocol {
         return _controllerVipUrl;
       }
 
-      public StreamPartitionMsgOffset getStreamPartitionMsgOffset() {
-        return new StreamPartitionMsgOffset(_offset);
+      public String getStreamPartitionMsgOffset() {
+        return _streamPartitionMsgOffset;
       }
     }
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 6ccc682..20c3298 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -50,6 +50,7 @@ import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
 import org.slf4j.Logger;
@@ -79,10 +80,12 @@ public class LLCSegmentCompletionHandlers {
   public String 
extendBuildTime(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
       @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+      @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_EXTRA_TIME_SEC) int 
extraTimeSec) {
 
-    if (instanceId == null || segmentName == null || offset == -1) {
-      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}", 
offset, segmentName, instanceId);
+    if (instanceId == null || segmentName == null || (offset == -1 && 
streamPartitionMsgOffset == null)) {
+      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}",
+          offset, segmentName, instanceId, streamPartitionMsgOffset);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
     if (extraTimeSec <= 0) {
@@ -92,8 +95,9 @@ public class LLCSegmentCompletionHandlers {
     }
 
     SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
-        .withExtraTimeSec(extraTimeSec);
+    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withExtraTimeSec(extraTimeSec);
+    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
+
     LOGGER.info("Processing extendBuildTime:{}", requestParams.toString());
 
     SegmentCompletionProtocol.Response response = 
_segmentCompletionManager.extendBuildTime(requestParams);
@@ -103,23 +107,39 @@ public class LLCSegmentCompletionHandlers {
     return responseStr;
   }
 
+  private void 
extractOffsetFromParams(SegmentCompletionProtocol.Request.Params requestParams,
+      @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
+      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset) {
+    // If the sender sent us a stream partition message offset, use it. If 
not, the sender is still old
+    // version, so pick up the old offset from it.
+    // TODO Remove this backup use of offset when server and controller are 
upgraded.
+    if (streamPartitionMsgOffset != null) {
+      requestParams.withStreamPartitionMsgOffset(new 
StreamPartitionMsgOffset(streamPartitionMsgOffset));
+    } else {
+      requestParams.withOffset(offset);
+    }
+  }
+
   @GET
   @Path(SegmentCompletionProtocol.MSG_TYPE_CONSUMED)
   @Produces(MediaType.APPLICATION_JSON)
   public String 
segmentConsumed(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
       @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+      @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason,
       @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows) {
 
-    if (instanceId == null || segmentName == null || offset == -1) {
-      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}", 
offset, segmentName, instanceId);
+    if (instanceId == null || segmentName == null || (offset == -1 && 
streamPartitionMsgOffset == null)) {
+      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}",
+          offset, segmentName, instanceId, streamPartitionMsgOffset);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
     SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset).withReason(stopReason)
+    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withReason(stopReason)
         .withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows);
+    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
     LOGGER.info("Processing segmentConsumed:{}", requestParams.toString());
 
     SegmentCompletionProtocol.Response response = 
_segmentCompletionManager.segmentConsumed(requestParams);
@@ -134,14 +154,17 @@ public class LLCSegmentCompletionHandlers {
   public String 
segmentStoppedConsuming(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
       @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+      @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason) {
 
-    if (instanceId == null || segmentName == null || offset == -1) {
-      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}", 
offset, segmentName, instanceId);
+    if (instanceId == null || segmentName == null || (offset == -1 && 
streamPartitionMsgOffset == null)) {
+      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}",
+          offset, segmentName, instanceId, streamPartitionMsgOffset);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
     SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset).withReason(stopReason);
+    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withReason(stopReason);
+    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
     LOGGER.info("Processing segmentStoppedConsuming:{}", 
requestParams.toString());
 
     SegmentCompletionProtocol.Response response = 
_segmentCompletionManager.segmentStoppedConsuming(requestParams);
@@ -156,20 +179,25 @@ public class LLCSegmentCompletionHandlers {
   public String 
segmentCommitStart(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) 
String instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
       @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+      @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes) {
-    if (instanceId == null || segmentName == null || offset == -1) {
+
+    if (instanceId == null || segmentName == null || (offset == -1 && 
streamPartitionMsgOffset == null)) {
+      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}",
+          offset, segmentName, instanceId, streamPartitionMsgOffset);
       LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}", 
offset, segmentName, instanceId);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
 
     SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
-        
.withMemoryUsedBytes(memoryUsedBytes).withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis)
-        .withNumRows(numRows).withSegmentSizeBytes(segmentSizeBytes);
+    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withMemoryUsedBytes(memoryUsedBytes)
+        
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+        .withSegmentSizeBytes(segmentSizeBytes);
+    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
 
     LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString());
 
@@ -187,14 +215,17 @@ public class LLCSegmentCompletionHandlers {
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
       @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+      @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes) {
-    if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null) {
-      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
-          instanceId, segmentLocation);
+    if (instanceId == null || segmentName == null
+        || segmentLocation == null
+        || (offset == -1 && streamPartitionMsgOffset == null)) {
+      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}, streamPartitionMsgOffset={}",
+          offset, segmentName, instanceId, segmentLocation, 
streamPartitionMsgOffset);
       // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
@@ -208,10 +239,11 @@ public class LLCSegmentCompletionHandlers {
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
     SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+    requestParams.withInstanceId(instanceId).withSegmentName(segmentName)
         
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
         
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
         .withMemoryUsedBytes(memoryUsedBytes);
+    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
     LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
 
     final boolean isSuccess = true;
@@ -233,15 +265,18 @@ public class LLCSegmentCompletionHandlers {
   public String 
segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
       @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+      @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, 
FormDataMultiPart multiPart) {
+
     SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+    requestParams.withInstanceId(instanceId).withSegmentName(segmentName)
         
.withSegmentSizeBytes(segmentSizeBytes).withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis)
         .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes);
+    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
     LOGGER.info("Processing segmentCommit:{}", requestParams.toString());
 
     final SegmentCompletionManager segmentCompletionManager = 
_segmentCompletionManager;
@@ -296,7 +331,7 @@ public class LLCSegmentCompletionHandlers {
 
     response = segmentCompletionManager.segmentCommitEnd(requestParams, 
success, false, committingSegmentDescriptor);
     LOGGER.info("Response to segmentCommit: instance={}  segment={} status={} 
offset={}", requestParams.getInstanceId(),
-        requestParams.getSegmentName(), response.getStatus(), 
response.getOffset());
+        requestParams.getSegmentName(), response.getStatus(), 
response.extractOffset());
 
     return response.toJsonString();
   }
@@ -310,9 +345,12 @@ public class LLCSegmentCompletionHandlers {
   @Consumes(MediaType.MULTIPART_FORM_DATA)
   public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, 
FormDataMultiPart multiPart) {
+      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+      @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
+      FormDataMultiPart multiPart) {
     SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
+    requestParams.withInstanceId(instanceId).withSegmentName(segmentName);
+    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
     LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
 
     // Get the segment from the form input and put it into the data directory 
(could be remote)
@@ -349,24 +387,27 @@ public class LLCSegmentCompletionHandlers {
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
       @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+      @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
       FormDataMultiPart metadataFiles) {
-    if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
-      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
-          instanceId, segmentLocation);
+    if (instanceId == null || segmentName == null || segmentLocation == null 
|| metadataFiles == null
+      || (offset == -1 && streamPartitionMsgOffset == null)) {
+      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}, streamPartitionMsgOffset={}",
+          offset, segmentName, instanceId, segmentLocation, 
streamPartitionMsgOffset);
       // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
 
     SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+    requestParams.withInstanceId(instanceId).withSegmentName(segmentName)
         
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
         
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
         .withMemoryUsedBytes(memoryUsedBytes);
+    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
     LOGGER.info("Processing segmentCommitEndWithMetadata:{}", 
requestParams.toString());
 
     SegmentMetadataImpl segmentMetadata;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
index 15a443e..72486ba 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
@@ -47,7 +47,7 @@ public class SegmentCompletionProtocolDeserTest {
 
     SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(params);
     assertEquals(response.getBuildTimeSeconds(), BUILD_TIME_MILLIS);
-    assertEquals(response.getStreamPartitionMsgOffset().compareTo(OFFSET), 0);
+    assertEquals(new 
StreamPartitionMsgOffset(response.getStreamPartitionMsgOffset()).compareTo(OFFSET),
 0);
     assertEquals(response.getSegmentLocation(), SEGMENT_LOCATION);
     assertTrue(response.isSplitCommit());
     assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
@@ -63,7 +63,7 @@ public class SegmentCompletionProtocolDeserTest {
 
     SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(params);
     assertEquals(response.getBuildTimeSeconds(), BUILD_TIME_MILLIS);
-    assertEquals(response.getStreamPartitionMsgOffset().compareTo(OFFSET), 0);
+    assertEquals(new 
StreamPartitionMsgOffset(response.getStreamPartitionMsgOffset()).compareTo(OFFSET),
 0);
     assertNull(response.getSegmentLocation());
     assertFalse(response.isSplitCommit());
     assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 3bd88da..d8dda56 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -54,9 +55,9 @@ public class SegmentCompletionTest {
   private final String s2 = "S2";
   private final String s3 = "S3";
 
-  private final long s1Offset = 20L;
-  private final long s2Offset = 40L;
-  private final long s3Offset = 30L;
+  private final StreamPartitionMsgOffset s1Offset = new 
StreamPartitionMsgOffset(20L);
+  private final StreamPartitionMsgOffset s2Offset = new 
StreamPartitionMsgOffset(40L);
+  private final StreamPartitionMsgOffset s3Offset = new 
StreamPartitionMsgOffset(30L);
 
   @BeforeMethod
   public void testCaseSetup()
@@ -64,6 +65,14 @@ public class SegmentCompletionTest {
     testCaseSetup(true, true);
   }
 
+  private void verifyOffset(SegmentCompletionProtocol.Response response, 
StreamPartitionMsgOffset offset) {
+    Assert.assertEquals(new 
StreamPartitionMsgOffset(response.getStreamPartitionMsgOffset()).compareTo(offset),
 0);
+    // Compatibility test:
+    // The controller must always respond with BOTH fields -- 'offset' as well 
as 'streamPartitionMsgOffset', and they
+    // should be the same value.
+    Assert.assertEquals(response.getOffset(), offset.getOffset());
+  }
+
   public void testCaseSetup(boolean isLeader, boolean isConnected)
       throws Exception {
     PinotHelixResourceManager mockPinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
@@ -114,18 +123,18 @@ public class SegmentCompletionTest {
 
     // s1 sends offset of 20, gets HOLD at t = 5s;
     segmentCompletionMgr._seconds = 5;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 sends offset of 30, gets catchup to 40
     segmentCompletionMgr._seconds += 1;
     params =
-        new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr).withReason(reason);
+        new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr).withReason(reason);
     response = segmentCompletionMgr.segmentStoppedConsuming(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
     Assert.assertEquals(new LLCSegmentName(segmentNameStr), 
segmentManager._stoppedSegmentName);
@@ -135,27 +144,27 @@ public class SegmentCompletionTest {
 
     // Now s1 comes back, and is asked to catchup.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
     // s3 comes back with new caught up offset, it should get a HOLD, since 
commit is not done yet.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 executes a successful commit
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
 
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr
         .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -165,7 +174,7 @@ public class SegmentCompletionTest {
 
     // Now if s3 or s1 come back, they are asked to keep the segment they have.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
 
@@ -182,7 +191,7 @@ public class SegmentCompletionTest {
     // s1 stops consuming at t = 5;
     segmentCompletionMgr._seconds = 5;
     params =
-        new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr).withReason(reason);
+        new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr).withReason(reason);
     response = segmentCompletionMgr.segmentStoppedConsuming(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
     Assert.assertEquals(new LLCSegmentName(segmentNameStr), 
segmentManager._stoppedSegmentName);
@@ -192,40 +201,40 @@ public class SegmentCompletionTest {
 
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
 
     // s3 sends offset of 30, gets catchup to 40, s2 should have been decided 
as the winner now
     // since we are never expecting to hear back from s1
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
 
     // s3 happens to come back (after catchup to s2offset) before s2 should 
get a hold since s2 has been decided as
     // the winner.
     // TODO Can optimize here since s2 is not notified yet.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
     // s2 executes a successful commit
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
 
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr
         .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -235,7 +244,7 @@ public class SegmentCompletionTest {
 
     // Now if s3 or s1 come back, they are asked to keep the segment they have.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
 
@@ -251,7 +260,7 @@ public class SegmentCompletionTest {
     Request.Params params;
     segmentCompletionMgr._seconds = 5;
 
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withReason("some reason");
     response = segmentCompletionMgr.segmentStoppedConsuming(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
@@ -284,7 +293,7 @@ public class SegmentCompletionTest {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     segmentCompletionMgr._seconds = 10;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.FAILED);
   }
@@ -297,46 +306,46 @@ public class SegmentCompletionTest {
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
     segmentCompletionMgr._seconds = 5L;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 sends offset of 30, gets catchup to 40
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // Now s1 comes back, and is asked to catchup.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     // TODO: Verify controller asked to do a split commit
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
     // s3 comes back with new caught up offset, it should get a HOLD, since 
commit is not done yet.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 executes a successful commit start
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
     // s2's file does not successfully commit because 
MockPinotLLCRealtimeSegmentManager.commitSegmentFile() returns
     // false when detecting SegmentLocation == "doNotCommitMe";
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("doNotCommitMe");
     response = segmentCompletionMgr
         .segmentCommitEnd(params, true, true, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
@@ -347,30 +356,30 @@ public class SegmentCompletionTest {
 
     // Now s1 comes back; it is asked to hold.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
 
     // Now s3 comes back; it is asked to commit
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT);
 
     // Now s2 comes back; it is asked to hold
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
 
     // s3 executes a successful commit start
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
     // s3's file successfully commits
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
     response = segmentCompletionMgr
         .segmentCommitEnd(params, true, true, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
@@ -385,45 +394,45 @@ public class SegmentCompletionTest {
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
     segmentCompletionMgr._seconds = startTime;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 sends offset of 30, gets catchup to 40
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // Now s1 comes back, and is asked to catchup.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     // TODO: Verify controller asked to do a split commit
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
     // s3 comes back with new caught up offset, it should get a HOLD, since 
commit is not done yet.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 executes a successful commit
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
 
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
     response = segmentCompletionMgr
         .segmentCommitEnd(params, true, true, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
@@ -434,7 +443,7 @@ public class SegmentCompletionTest {
 
     // Now if s3 or s1 come back, they are asked to keep the segment they have.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
 
@@ -450,30 +459,30 @@ public class SegmentCompletionTest {
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
     segmentCompletionMgr._seconds = 5L;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 20, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 sends offset of 20, gets commit
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT);
-    Assert.assertEquals(response.getOffset(), s1Offset);
+    verifyOffset(response, s1Offset);
 
     // s3 sends a commit start with 20
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
 
     // s3 comes back to try to commit with a different offset
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
     response = segmentCompletionMgr
         .segmentCommitEnd(params, true, true, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
@@ -484,7 +493,7 @@ public class SegmentCompletionTest {
 
     // Now if s2 or s1 come back, they are asked to hold.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
 
@@ -498,43 +507,43 @@ public class SegmentCompletionTest {
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
     segmentCompletionMgr._seconds = startTime;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 sends offset of 30, gets catchup to 40
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // Now s1 comes back, and is asked to catchup.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
     // s3 comes back with new caught up offset, it should get a HOLD, since 
commit is not done yet.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 executes a successful commit
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
 
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr
         .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -544,7 +553,7 @@ public class SegmentCompletionTest {
 
     // Now if s3 or s1 come back, they are asked to keep the segment they have.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
 
@@ -560,7 +569,7 @@ public class SegmentCompletionTest {
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
     segmentCompletionMgr._seconds = 5L;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr)
         .withReason("rowLimit");
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.NOT_LEADER);
@@ -572,7 +581,7 @@ public class SegmentCompletionTest {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     segmentCompletionMgr._seconds = 10L;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
@@ -584,40 +593,40 @@ public class SegmentCompletionTest {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     segmentCompletionMgr._seconds = 10L;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT);
     // S2 comes with the same offset as S1
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
     segmentCompletionMgr._seconds += 1;
     // S3 comes with a different offset and without row limit. we ask it to 
hold even though it is higher.
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
     // S1 comes back to commit the segment
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
 
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr
         .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // We ask S2 to keep the segment
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.KEEP);
     // And we ask S3 to discard because it was ahead.
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.DISCARD);
@@ -643,37 +652,38 @@ public class SegmentCompletionTest {
     // s1 sends offset of 20, gets HOLD at t = 5s;
     final int startTimeSecs = 5;
     segmentCompletionMgr._seconds = startTimeSecs;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // Now s1 comes back again, and is asked to hold
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 
SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 1000;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
     // Now s3 comes up with a better offset, but we ask it to hold, since the 
committer has not committed yet.
     segmentCompletionMgr._seconds += 1;
-    params = new Request.Params().withInstanceId(s3).withOffset(s2Offset + 
10).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withSegmentName(segmentNameStr);
+    params.withStreamPartitionMsgOffset(new 
StreamPartitionMsgOffset(s2Offset.getOffset() + 10));
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 commits.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
     response = segmentCompletionMgr.segmentCommitEnd(params, true, 
isSplitCommit,
         CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
@@ -683,7 +693,8 @@ public class SegmentCompletionTest {
 
     // Now s3 comes back to get a discard.
     segmentCompletionMgr._seconds += 1;
-    params = new Request.Params().withInstanceId(s3).withOffset(s2Offset + 
10).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withSegmentName(segmentNameStr);
+    params.withStreamPartitionMsgOffset(new 
StreamPartitionMsgOffset(s2Offset.getOffset() + 10));
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.DISCARD);
     // Now the FSM should have disappeared from the map
@@ -699,42 +710,42 @@ public class SegmentCompletionTest {
     // s1 sends offset of 20, gets HOLD at t = 5s;
     final int startTimeSecs = 5;
     segmentCompletionMgr._seconds = startTimeSecs;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 is asked to catch up.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
 
     // Now s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
     // All servers are dead
     segmentCompletionMgr._seconds += 3600;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
 
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
     Assert.assertTrue(fsmMap.containsKey(segmentNameStr));
 
     // Now s2 is asked to commit because the max time to pick committer has 
passed.
     segmentCompletionMgr._seconds += 4;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT);
   }
@@ -748,23 +759,23 @@ public class SegmentCompletionTest {
     // s1 sends offset of 20, gets HOLD at t = 5s;
     final int startTimeSecs = 5;
     segmentCompletionMgr._seconds = startTimeSecs;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 is asked to hold.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // Now s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
@@ -772,7 +783,7 @@ public class SegmentCompletionTest {
     segmentCompletionMgr._seconds += 
SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 1000;
 
     // But since s1 and s3 are in HOLDING state, they should come back again. 
s1 is asked to catchup
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
 
@@ -782,7 +793,7 @@ public class SegmentCompletionTest {
 
     // s1 comes back with the updated offset, since it was asked to catch up.
     // The FSM will be aborted, and destroyed ...
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
 
@@ -790,19 +801,19 @@ public class SegmentCompletionTest {
 
     // s1 comes back again, a new FSM created
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
 
     // s3 comes back
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
 
     // And winner chosen when the last one does not come back at all
     segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
@@ -820,28 +831,28 @@ public class SegmentCompletionTest {
     final String tableName = new LLCSegmentName(segmentNameStr).getTableName();
     Assert.assertNull(commitTimeMap.get(tableName));
     segmentCompletionMgr._seconds = startTime;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 sends offset of 30, gets catchup to 40
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // Now s1 comes back, and is asked to catchup.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
     long commitTimeSec = response.getBuildTimeSeconds();
@@ -849,7 +860,7 @@ public class SegmentCompletionTest {
 
     // Fast forward to one second before commit time, and send a lease renewal 
request for 20s
     segmentCompletionMgr._seconds = startTime + commitTimeSec - 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withExtraTimeSec(20);
     response = segmentCompletionMgr.extendBuildTime(params);
     Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.PROCESSED);
@@ -857,7 +868,7 @@ public class SegmentCompletionTest {
 
     // Another lease extension in 19s.
     segmentCompletionMgr._seconds += 19;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withExtraTimeSec(20);
     response = segmentCompletionMgr.extendBuildTime(params);
     Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.PROCESSED);
@@ -865,7 +876,7 @@ public class SegmentCompletionTest {
 
     // Commit in 15s
     segmentCompletionMgr._seconds += 15;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
     long commitTimeMs = (segmentCompletionMgr._seconds - startTime) * 1000;
@@ -887,28 +898,28 @@ public class SegmentCompletionTest {
     // s1 sends offset of 20, gets HOLD at t = 5s;
     final long startTime = 5;
     segmentCompletionMgr._seconds = startTime;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 sends offset of 30, gets catchup to 40
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // Now s1 comes back, and is asked to catchup.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
     long commitTimeSec = response.getBuildTimeSeconds();
@@ -916,7 +927,7 @@ public class SegmentCompletionTest {
 
     // Fast forward to one second before commit time, and send a lease renewal 
request for 20s
     segmentCompletionMgr._seconds = startTime + commitTimeSec - 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withExtraTimeSec(20);
     response = segmentCompletionMgr.extendBuildTime(params);
     Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.PROCESSED);
@@ -924,7 +935,7 @@ public class SegmentCompletionTest {
 
     // Come back too late.
     segmentCompletionMgr._seconds += 25;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
     // now FSM should be out of the map.
@@ -940,28 +951,28 @@ public class SegmentCompletionTest {
     // s1 sends offset of 20, gets HOLD at t = 5s;
     final long startTime = 5;
     segmentCompletionMgr._seconds = startTime;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 sends offset of 30, gets catchup to 40
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // Now s1 comes back, and is asked to catchup.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
     long commitTimeSec = response.getBuildTimeSeconds();
@@ -969,7 +980,7 @@ public class SegmentCompletionTest {
 
     // Fast forward to one second before commit time, and send a lease renewal 
request for 20s
     segmentCompletionMgr._seconds = startTime + commitTimeSec - 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withExtraTimeSec(20);
     response = segmentCompletionMgr.extendBuildTime(params);
     Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.PROCESSED);
@@ -979,7 +990,7 @@ public class SegmentCompletionTest {
     // Lease will not be granted if the time taken so far plus lease time 
exceeds the max allowabale.
     while (segmentCompletionMgr._seconds + leaseTimeSec <= startTime + 
SegmentCompletionManager
         .getMaxCommitTimeForAllSegmentsSeconds()) {
-      params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+      params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
           .withExtraTimeSec(leaseTimeSec);
       response = segmentCompletionMgr.extendBuildTime(params);
       Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.PROCESSED);
@@ -988,7 +999,7 @@ public class SegmentCompletionTest {
     }
 
     // Now the lease request should fail.
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withExtraTimeSec(leaseTimeSec);
     response = segmentCompletionMgr.extendBuildTime(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.FAILED);
@@ -1002,28 +1013,28 @@ public class SegmentCompletionTest {
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
     segmentCompletionMgr._seconds = 5;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 sends offset of 30, gets catchup to 40
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // Now s1 comes back, and is asked to catchup.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
@@ -1031,24 +1042,24 @@ public class SegmentCompletionTest {
     replaceSegmentCompletionManager();
 
     // s3 comes back with the correct offset but is asked to hold.
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
 
     // s1 comes back, and still asked to hold.
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
 
     // s2 has no idea the controller failed, so it comes back with a 
commit,but the controller asks it to hold,
     // (essentially a commit failure)
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     
Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
 
     // So s2 goes back into HOLDING state. s1 and s3 are already holding, so 
now it will get COMMIT back.
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
   }
@@ -1062,28 +1073,28 @@ public class SegmentCompletionTest {
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
     segmentCompletionMgr._seconds = 5;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s2 sends offset of 40, gets HOLD
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
     // s3 sends offset of 30, gets catchup to 40
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    Assert.assertEquals(response.getOffset(), s2Offset);
+    verifyOffset(response, s2Offset);
     // Now s1 comes back, and is asked to catchup.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
     // s2 is asked to commit.
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
@@ -1091,24 +1102,24 @@ public class SegmentCompletionTest {
     replaceSegmentCompletionManager();
 
     // s3 comes back with the correct offset but is asked to hold.
-    params = new 
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
 
     // s1 comes back, and still asked to hold.
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
 
     // s2 has no idea the controller failed, so it comes back with a 
commit,but the controller asks it to hold,
     // (essentially a commit failure)
     segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     
Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
 
     // So s2 goes back into HOLDING state. s1 and s3 are already holding, so 
now it will get COMMIT back.
-    params = new 
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new 
Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
@@ -1121,11 +1132,11 @@ public class SegmentCompletionTest {
     SegmentCompletionProtocol.Response response;
     SegmentCompletionProtocol.Request.Params params = new 
SegmentCompletionProtocol.Request.Params();
 
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER);
 
-    params = new 
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
+    params = new 
Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr);
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index d13344a..95633d5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -536,7 +536,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
           _state = State.HOLDING;
           SegmentCompletionProtocol.Response response = 
postSegmentConsumedMsg();
           SegmentCompletionProtocol.ControllerResponseStatus status = 
response.getStatus();
-          StreamPartitionMsgOffset rspOffset = new 
StreamPartitionMsgOffset(response.getOffset());
+          StreamPartitionMsgOffset rspOffset = response.extractOffset();
           boolean success;
           switch (status) {
             case NOT_LEADER:
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index a682c85..3e3020a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -41,6 +41,7 @@ import 
org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
 import 
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -69,7 +70,8 @@ public class LLRealtimeSegmentDataManagerTest {
   private static final LLCSegmentName _segmentName =
       new LLCSegmentName(_tableName, _partitionId, _sequenceId, _segTimeMs);
   private static final String _segmentNameStr = _segmentName.getSegmentName();
-  private static final long _startOffset = 19885L;
+  private static final long _startOffsetValue = 19885L;
+  private static final StreamPartitionMsgOffset _startOffset = new 
StreamPartitionMsgOffset(_startOffsetValue);
   private static final String _topicName = "someTopic";
   private static final int maxRowsInSegment = 250000;
   private static final long maxTimeForSegmentCloseMs = 64368000L;
@@ -134,7 +136,7 @@ public class LLRealtimeSegmentDataManagerTest {
 
     LLCRealtimeSegmentZKMetadata segmentZKMetadata = new 
LLCRealtimeSegmentZKMetadata();
     segmentZKMetadata.setSegmentName(_segmentNameStr);
-    segmentZKMetadata.setStartOffset(_startOffset);
+    segmentZKMetadata.setStartOffset(_startOffset.getOffset());
     segmentZKMetadata.setCreationTime(System.currentTimeMillis());
     return segmentZKMetadata;
   }
@@ -173,12 +175,12 @@ public class LLRealtimeSegmentDataManagerTest {
       throws Exception {
     FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
     LLRealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
-    final long endOffset = _startOffset + 500;
+    final StreamPartitionMsgOffset endOffset = new 
StreamPartitionMsgOffset(_startOffsetValue + 500);
     // We should consume initially...
     segmentDataManager._consumeOffsets.add(endOffset);
     final SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(
         new SegmentCompletionProtocol.Response.Params()
-            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withOffset(endOffset));
+            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withStreamPartitionMsgOffset(endOffset));
     // And then never consume as long as we get a hold response, 100 times.
     for (int i = 0; i < 100; i++) {
       segmentDataManager._responses.add(response);
@@ -202,14 +204,14 @@ public class LLRealtimeSegmentDataManagerTest {
       throws Exception {
     FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
     LLRealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
-    final long endOffset = _startOffset + 500;
+    final StreamPartitionMsgOffset endOffset = new 
StreamPartitionMsgOffset(_startOffsetValue + 500);
     // We should consume initially...
     segmentDataManager._consumeOffsets.add(endOffset);
     final SegmentCompletionProtocol.Response holdResponse = new 
SegmentCompletionProtocol.Response(
-        new SegmentCompletionProtocol.Response.Params().withOffset(endOffset)
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(endOffset)
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
     final SegmentCompletionProtocol.Response commitResponse = new 
SegmentCompletionProtocol.Response(
-        new SegmentCompletionProtocol.Response.Params().withOffset(endOffset)
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(endOffset)
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT));
     // And then never consume as long as we get a hold response, 100 times.
     segmentDataManager._responses.add(holdResponse);
@@ -233,11 +235,11 @@ public class LLRealtimeSegmentDataManagerTest {
       throws Exception {
     FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
     LLRealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
-    final long endOffset = _startOffset + 500;
+    final StreamPartitionMsgOffset endOffset = new 
StreamPartitionMsgOffset(_startOffsetValue + 500);
     // We should consume initially...
     segmentDataManager._consumeOffsets.add(endOffset);
     final SegmentCompletionProtocol.Response commitResponse = new 
SegmentCompletionProtocol.Response(
-        new SegmentCompletionProtocol.Response.Params().withOffset(endOffset)
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(endOffset)
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT));
     segmentDataManager._responses.add(commitResponse);
     segmentDataManager._failSegmentBuild = true;
@@ -254,23 +256,24 @@ public class LLRealtimeSegmentDataManagerTest {
       throws Exception {
     FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
     LLRealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
-    final long firstOffset = _startOffset + 500;
-    final long catchupOffset = firstOffset + 10;
+    final StreamPartitionMsgOffset firstOffset = new 
StreamPartitionMsgOffset(_startOffsetValue + 500);
+    final StreamPartitionMsgOffset catchupOffset = new 
StreamPartitionMsgOffset(firstOffset.getOffset() + 10);
     // We should consume initially...
     segmentDataManager._consumeOffsets.add(firstOffset);
     segmentDataManager._consumeOffsets.add(catchupOffset); // Offset after 
catchup
     final SegmentCompletionProtocol.Response holdResponse1 = new 
SegmentCompletionProtocol.Response(
         new SegmentCompletionProtocol.Response.Params()
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).
-            withOffset(firstOffset));
+            withStreamPartitionMsgOffset(firstOffset));
     final SegmentCompletionProtocol.Response catchupResponse = new 
SegmentCompletionProtocol.Response(
         new SegmentCompletionProtocol.Response.Params()
-            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP).withOffset(catchupOffset));
+            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP)
+            .withStreamPartitionMsgOffset(catchupOffset));
     final SegmentCompletionProtocol.Response holdResponse2 = new 
SegmentCompletionProtocol.Response(
-        new 
SegmentCompletionProtocol.Response.Params().withOffset(catchupOffset)
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(catchupOffset)
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
     final SegmentCompletionProtocol.Response commitResponse = new 
SegmentCompletionProtocol.Response(
-        new 
SegmentCompletionProtocol.Response.Params().withOffset(catchupOffset)
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(catchupOffset)
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT));
     // And then never consume as long as we get a hold response, 100 times.
     segmentDataManager._responses.add(holdResponse1);
@@ -296,10 +299,10 @@ public class LLRealtimeSegmentDataManagerTest {
       throws Exception {
     FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
     LLRealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
-    final long endOffset = _startOffset + 500;
+    final StreamPartitionMsgOffset endOffset = new 
StreamPartitionMsgOffset(_startOffsetValue + 500);
     segmentDataManager._consumeOffsets.add(endOffset);
     final SegmentCompletionProtocol.Response discardResponse = new 
SegmentCompletionProtocol.Response(
-        new SegmentCompletionProtocol.Response.Params().withOffset(endOffset)
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(endOffset)
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.DISCARD));
     segmentDataManager._responses.add(discardResponse);
 
@@ -321,10 +324,10 @@ public class LLRealtimeSegmentDataManagerTest {
       throws Exception {
     FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
     LLRealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
-    final long endOffset = _startOffset + 500;
+    final StreamPartitionMsgOffset endOffset = new 
StreamPartitionMsgOffset(_startOffsetValue + 500);
     segmentDataManager._consumeOffsets.add(endOffset);
     SegmentCompletionProtocol.Response.Params params = new 
SegmentCompletionProtocol.Response.Params();
-    
params.withOffset(endOffset).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
+    
params.withStreamPartitionMsgOffset(endOffset).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
     final SegmentCompletionProtocol.Response keepResponse = new 
SegmentCompletionProtocol.Response(params);
     segmentDataManager._responses.add(keepResponse);
 
@@ -345,11 +348,11 @@ public class LLRealtimeSegmentDataManagerTest {
       throws Exception {
     FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
     LLRealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
-    final long endOffset = _startOffset + 500;
+    final StreamPartitionMsgOffset endOffset = new 
StreamPartitionMsgOffset(_startOffsetValue + 500);
     // We should consume initially...
     segmentDataManager._consumeOffsets.add(endOffset);
     final SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(
-        new SegmentCompletionProtocol.Response.Params().withOffset(endOffset)
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(endOffset)
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER));
     // And then never consume as long as we get a Not leader response, 100 
times.
     for (int i = 0; i < 100; i++) {
@@ -389,8 +392,9 @@ public class LLRealtimeSegmentDataManagerTest {
   public void testOnlineTransitionAfterStop()
       throws Exception {
     LLCRealtimeSegmentZKMetadata metadata = new LLCRealtimeSegmentZKMetadata();
-    final long finalOffset = _startOffset + 600;
-    metadata.setEndOffset(finalOffset);
+    final long finalOffsetValue = _startOffsetValue + 600;
+    final StreamPartitionMsgOffset finalOffset = new 
StreamPartitionMsgOffset(finalOffsetValue);
+    metadata.setEndOffset(finalOffset.getOffset());
 
     {
       FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
@@ -437,7 +441,7 @@ public class LLRealtimeSegmentDataManagerTest {
       FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
       segmentDataManager._stopWaitTimeMs = 0;
       segmentDataManager._state.set(segmentDataManager, 
LLRealtimeSegmentDataManager.State.HOLDING);
-      segmentDataManager.setCurrentOffset(finalOffset + 1);
+      segmentDataManager.setCurrentOffset(finalOffsetValue + 1);
       segmentDataManager.goOnlineFromConsuming(metadata);
       Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
       Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
@@ -449,7 +453,7 @@ public class LLRealtimeSegmentDataManagerTest {
       FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
       segmentDataManager._stopWaitTimeMs = 0;
       segmentDataManager._state.set(segmentDataManager, 
LLRealtimeSegmentDataManager.State.CATCHING_UP);
-      segmentDataManager.setCurrentOffset(finalOffset + 1);
+      segmentDataManager.setCurrentOffset(finalOffsetValue + 1);
       segmentDataManager.goOnlineFromConsuming(metadata);
       Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
       Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
@@ -461,7 +465,7 @@ public class LLRealtimeSegmentDataManagerTest {
       FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
       segmentDataManager._stopWaitTimeMs = 0;
       segmentDataManager._state.set(segmentDataManager, 
LLRealtimeSegmentDataManager.State.CATCHING_UP);
-      segmentDataManager._consumeOffsets.add(finalOffset - 1);
+      segmentDataManager._consumeOffsets.add(new 
StreamPartitionMsgOffset(finalOffsetValue - 1));
       segmentDataManager.goOnlineFromConsuming(metadata);
       Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
       Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
@@ -515,7 +519,7 @@ public class LLRealtimeSegmentDataManagerTest {
     {
       FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
       segmentDataManager._state.set(segmentDataManager, 
LLRealtimeSegmentDataManager.State.CATCHING_UP);
-      final long finalOffset = _startOffset + 100;
+      final long finalOffset = _startOffsetValue + 100;
       segmentDataManager.setFinalOffset(finalOffset);
       segmentDataManager.setCurrentOffset(finalOffset - 1);
       Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
@@ -528,7 +532,7 @@ public class LLRealtimeSegmentDataManagerTest {
       FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
       _timeNow += maxTimeForSegmentCloseMs;
       segmentDataManager._state.set(segmentDataManager, 
LLRealtimeSegmentDataManager.State.CATCHING_UP);
-      final long finalOffset = _startOffset + 100;
+      final long finalOffset = _startOffsetValue + 100;
       segmentDataManager.setFinalOffset(finalOffset);
       segmentDataManager.setCurrentOffset(finalOffset - 1);
       Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
@@ -543,7 +547,7 @@ public class LLRealtimeSegmentDataManagerTest {
       _timeNow += 1;
       segmentDataManager._state.set(segmentDataManager, 
LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
       segmentDataManager.setConsumeEndTime(_timeNow + 10);
-      final long finalOffset = _startOffset + 100;
+      final long finalOffset = _startOffsetValue + 100;
       segmentDataManager.setFinalOffset(finalOffset);
       segmentDataManager.setCurrentOffset(finalOffset - 1);
       Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
@@ -557,7 +561,7 @@ public class LLRealtimeSegmentDataManagerTest {
       segmentDataManager._state.set(segmentDataManager, 
LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
       final long endTime = _timeNow + 10;
       segmentDataManager.setConsumeEndTime(endTime);
-      final long finalOffset = _startOffset + 100;
+      final long finalOffset = _startOffsetValue + 100;
       segmentDataManager.setFinalOffset(finalOffset);
       segmentDataManager.setCurrentOffset(finalOffset - 1);
       _timeNow = endTime - 1;
@@ -627,7 +631,7 @@ public class LLRealtimeSegmentDataManagerTest {
     // Set up the responses so that we get a failed response first and then a 
success response.
     segmentDataManager._responses.add(commitFailed);
     final long leaseTime = 50000L;
-    final long finalOffset = _startOffset + 600;
+    final long finalOffset = _startOffsetValue + 600;
     segmentDataManager.setCurrentOffset(finalOffset);
 
     // We have set up commit to fail, so we should carry over the segment file.
@@ -703,7 +707,7 @@ public class LLRealtimeSegmentDataManagerTest {
     public Field _state;
     public Field _shouldStop;
     public Field _stopReason;
-    public LinkedList<Long> _consumeOffsets = new LinkedList<>();
+    public LinkedList<StreamPartitionMsgOffset> _consumeOffsets = new 
LinkedList<>();
     public LinkedList<SegmentCompletionProtocol.Response> _responses = new 
LinkedList<>();
     public boolean _commitSegmentCalled = false;
     public boolean _buildSegmentCalled = false;
@@ -787,7 +791,7 @@ public class LLRealtimeSegmentDataManagerTest {
       if (_throwExceptionFromConsume) {
         throw new PermanentConsumerException(new Throwable("Offset out of 
range"));
       }
-      setCurrentOffset(_consumeOffsets.remove());
+      setCurrentOffset(_consumeOffsets.remove().getOffset());
       terminateLoopIfNecessary();
       return true;
     }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
index a03ca98..d398d9f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
@@ -44,6 +44,7 @@ import 
org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.server.realtime.ControllerLeaderLocator;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import 
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -142,8 +143,8 @@ public class SegmentCompletionIntegrationTest extends 
BaseClusterIntegrationTest
     ServerSegmentCompletionProtocolHandler protocolHandler =
         new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new 
MetricsRegistry()), realtimeTableName);
     SegmentCompletionProtocol.Request.Params params = new 
SegmentCompletionProtocol.Request.Params();
-    
params.withOffset(45688L).withSegmentName(_currentSegment).withReason("RandomReason")
-        .withInstanceId(_serverInstance);
+    params.withStreamPartitionMsgOffset(new 
StreamPartitionMsgOffset(45688L)).withSegmentName(_currentSegment)
+        .withReason("RandomReason") .withInstanceId(_serverInstance);
     SegmentCompletionProtocol.Response response = 
protocolHandler.segmentStoppedConsuming(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
index e571b91..c795448 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
@@ -42,6 +42,15 @@ public class StreamPartitionMsgOffset implements Comparable {
     _offset = offset;
   }
 
+  /**
+   * This constructor will go away when this class becomes an interface
+   * @param offsetStr
+   */
+  @Deprecated
+  public StreamPartitionMsgOffset(String offsetStr) {
+    _offset = Long.parseLong(offsetStr);
+  }
+
   @Deprecated
   public void setOffset(long offset) {
     _offset = offset;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to