This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 056c930 Changed the segment commit protocol to send/receive streamPartitionMs… (#5486) 056c930 is described below commit 056c930c6e62bbcd4cd3e8c659e16e815532dc4c Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com> AuthorDate: Wed Jun 3 13:01:24 2020 -0700 Changed the segment commit protocol to send/receive streamPartitionMs… (#5486) * Changed the segment commit protocol to send/receive streamPartitionMsgOffset Updated the segment commit protocol so that new element streamPartitionMsgOffset is populated in requests (as request parameters) and in response (as JSON string element) The server side has been modified to send the 'streamPartitionMsgOffset' as well as we the 'offset' parameters to the controller. The controller looks for and prefers streamPartitionMsgOffset but falls back to offset if the streamPartitionMsgOffset is not there. The controller side, in the repsonse message, populates both of the elements, and the server on the receiver side does likewise -- preferring streamPartitionMsgOffset. All callers into the protocol module have been modified to NOT set the offset field. Instead, only set the streamPartitionMsgOffset field. The 'offset' value will be derived from streamPartitionMsgOffset. Added a test to make sure that the controller always generates both elements. Such a test was not possible in the server side at this time, so verified manually. Manually ran LLCClusterIntergrationTest by disabling populating `streamPartitionMsgOffset` on the server side (old server/new controller) and on the controller respons side (new server/old controller) Issue #5359 * Addressed review comments --- .../protocols/SegmentCompletionProtocol.java | 76 +++-- .../resources/LLCSegmentCompletionHandlers.java | 93 ++++-- .../api/SegmentCompletionProtocolDeserTest.java | 4 +- .../helix/core/realtime/SegmentCompletionTest.java | 311 +++++++++++---------- .../realtime/LLRealtimeSegmentDataManager.java | 2 +- .../realtime/LLRealtimeSegmentDataManagerTest.java | 70 ++--- .../tests/SegmentCompletionIntegrationTest.java | 5 +- .../pinot/spi/stream/StreamPartitionMsgOffset.java | 9 + 8 files changed, 340 insertions(+), 230 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java index 3150504..8af8dbf 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java @@ -122,6 +122,7 @@ public class SegmentCompletionProtocol { public static final String PARAM_SEGMENT_LOCATION = "location"; public static final String PARAM_SEGMENT_NAME = "name"; public static final String PARAM_OFFSET = "offset"; + public static final String PARAM_STREAM_PARTITION_MSG_OFFSET = "streamPartitionMsgOffset"; public static final String PARAM_INSTANCE_ID = "instance"; public static final String PARAM_MEMORY_USED_BYTES = "memoryUsedBytes"; public static final String PARAM_SEGMENT_SIZE_BYTES = "segmentSizeBytes"; @@ -189,7 +190,10 @@ public class SegmentCompletionProtocol { + (_params.getSegmentSizeBytes() <= 0 ? "" : ("&" + PARAM_SEGMENT_SIZE_BYTES + "=" + _params.getSegmentSizeBytes())) + (_params.getNumRows() <= 0 ? "" : ("&" + PARAM_ROW_COUNT + "=" + _params.getNumRows())) + (_params.getSegmentLocation() == null ? "" - : ("&" + PARAM_SEGMENT_LOCATION + "=" + _params.getSegmentLocation())); + : ("&" + PARAM_SEGMENT_LOCATION + "=" + _params.getSegmentLocation())) + + (_params.getStreamPartitionMsgOffset() == null ? "" + : ("&" + PARAM_STREAM_PARTITION_MSG_OFFSET + "=" + _params.getStreamPartitionMsgOffset())) + ; } public static class Params { @@ -204,6 +208,7 @@ public class SegmentCompletionProtocol { private String _segmentLocation; private long _memoryUsedBytes; private long _segmentSizeBytes; + private String _streamPartitionMsgOffset; public Params() { _offset = -1L; @@ -216,6 +221,7 @@ public class SegmentCompletionProtocol { _segmentLocation = null; _memoryUsedBytes = MEMORY_USED_BYTES_DEFAULT; _segmentSizeBytes = SEGMENT_SIZE_BYTES_DEFAULT; + _streamPartitionMsgOffset = null; } public Params(Params params) { @@ -229,8 +235,10 @@ public class SegmentCompletionProtocol { _segmentLocation = params.getSegmentLocation(); _memoryUsedBytes = params.getMemoryUsedBytes(); _segmentSizeBytes = params.getSegmentSizeBytes(); + _streamPartitionMsgOffset = params.getStreamPartitionMsgOffset().toString(); } + @Deprecated public Params withOffset(long offset) { _offset = offset; return this; @@ -287,7 +295,14 @@ public class SegmentCompletionProtocol { } public Params withStreamPartitionMsgOffset(StreamPartitionMsgOffset offset) { - _offset = offset.getOffset(); + _streamPartitionMsgOffset = offset.toString(); + // Try to populate the offset if possible. + // TODO Remove this code once we have both sides be able to live without _offset. + try { + _offset = Long.parseLong(_streamPartitionMsgOffset); + } catch (Exception e) { + // Ignore, if the recipient excepts _offset, it will return an error to the sender. + } return this; } @@ -295,7 +310,8 @@ public class SegmentCompletionProtocol { return _segmentName; } - public long getOffset() { + @Deprecated + private long getOffset() { return _offset; } @@ -336,14 +352,19 @@ public class SegmentCompletionProtocol { } public StreamPartitionMsgOffset getStreamPartitionMsgOffset() { - return new StreamPartitionMsgOffset(_offset); + if (_streamPartitionMsgOffset != null) { + return new StreamPartitionMsgOffset(_streamPartitionMsgOffset); + } else { + return new StreamPartitionMsgOffset(_offset); + } } public String toString() { return "Offset: " + _offset + ",Segment name: " + _segmentName + ",Instance Id: " + _instanceId + ",Reason: " + _reason + ",NumRows: " + _numRows + ",BuildTimeMillis: " + _buildTimeMillis + ",WaitTimeMillis: " + _waitTimeMillis + ",ExtraTimeSec: " + _extraTimeSec + ",SegmentLocation: " + _segmentLocation - + ",MemoryUsedBytes: " + _memoryUsedBytes + ",SegmentSizeBytes: " + _segmentSizeBytes; + + ",MemoryUsedBytes: " + _memoryUsedBytes + ",SegmentSizeBytes: " + _segmentSizeBytes + + ",StreamPartitionMsgOffset: " + _streamPartitionMsgOffset; } } } @@ -404,6 +425,7 @@ public class SegmentCompletionProtocol { private boolean _splitCommit; private String _segmentLocation; private String _controllerVipUrl; + private String _streamPartitionMsgOffset; public Response() { } @@ -415,6 +437,7 @@ public class SegmentCompletionProtocol { _splitCommit = params.isSplitCommit(); _segmentLocation = params.getSegmentLocation(); _controllerVipUrl = params.getControllerVipUrl(); + _streamPartitionMsgOffset = params.getStreamPartitionMsgOffset(); } @JsonProperty(STATUS_KEY) @@ -427,17 +450,26 @@ public class SegmentCompletionProtocol { _status = status; } + @Deprecated @JsonProperty(OFFSET_KEY) public long getOffset() { return _offset; } // TODO Make it a JsonProperty when we are ready to move the protocol + // This method is called in the server when the controller responds with + // CATCH_UP response to segmentConsumed() API. @JsonIgnore - public StreamPartitionMsgOffset getStreamPartitionMsgOffset() { - return new StreamPartitionMsgOffset(_offset); + public String getStreamPartitionMsgOffset() { + return _streamPartitionMsgOffset; } + @JsonIgnore + public void setStreamPartitionMsgOffset(String streamPartitionMsgOffset) { + _streamPartitionMsgOffset = streamPartitionMsgOffset; + } + + @Deprecated @JsonProperty(OFFSET_KEY) public void setOffset(long offset) { _offset = offset; @@ -485,6 +517,14 @@ public class SegmentCompletionProtocol { _segmentLocation = segmentLocation; } + public StreamPartitionMsgOffset extractOffset() { + if (_streamPartitionMsgOffset != null) { + return new StreamPartitionMsgOffset(getStreamPartitionMsgOffset()); + } else { + return new StreamPartitionMsgOffset(getOffset()); + } + } + public String toJsonString() { try { return JsonUtils.objectToString(this); @@ -508,6 +548,7 @@ public class SegmentCompletionProtocol { private boolean _splitCommit; private String _segmentLocation; private String _controllerVipUrl; + private String _streamPartitionMsgOffset; public Params() { _offset = -1L; @@ -516,11 +557,7 @@ public class SegmentCompletionProtocol { _splitCommit = false; _segmentLocation = null; _controllerVipUrl = null; - } - - public Params withOffset(long offset) { - _offset = offset; - return this; + _streamPartitionMsgOffset = null; } public Params withStatus(ControllerResponseStatus status) { @@ -549,7 +586,13 @@ public class SegmentCompletionProtocol { } public Params withStreamPartitionMsgOffset(StreamPartitionMsgOffset offset) { - _offset = offset.getOffset(); + _streamPartitionMsgOffset = offset.toString(); + // TODO Remove the block below once we have both parties be fine without _offset being present. + try { + _offset = Long.parseLong(_streamPartitionMsgOffset); + } catch (Exception e) { + // Ignore. If the received expects _offset, it will return an error to the sender. + } return this; } @@ -557,7 +600,8 @@ public class SegmentCompletionProtocol { return _status; } - public long getOffset() { + @Deprecated + private long getOffset() { return _offset; } @@ -577,8 +621,8 @@ public class SegmentCompletionProtocol { return _controllerVipUrl; } - public StreamPartitionMsgOffset getStreamPartitionMsgOffset() { - return new StreamPartitionMsgOffset(_offset); + public String getStreamPartitionMsgOffset() { + return _streamPartitionMsgOffset; } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java index 6ccc682..20c3298 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java @@ -50,6 +50,7 @@ import org.apache.pinot.core.segment.creator.impl.V1Constants; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.slf4j.Logger; @@ -79,10 +80,12 @@ public class LLCSegmentCompletionHandlers { public String extendBuildTime(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName, @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, + @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset, @QueryParam(SegmentCompletionProtocol.PARAM_EXTRA_TIME_SEC) int extraTimeSec) { - if (instanceId == null || segmentName == null || offset == -1) { - LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}", offset, segmentName, instanceId); + if (instanceId == null || segmentName == null || (offset == -1 && streamPartitionMsgOffset == null)) { + LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, streamPartitionMsgOffset={}", + offset, segmentName, instanceId, streamPartitionMsgOffset); return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); } if (extraTimeSec <= 0) { @@ -92,8 +95,9 @@ public class LLCSegmentCompletionHandlers { } SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params(); - requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset) - .withExtraTimeSec(extraTimeSec); + requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withExtraTimeSec(extraTimeSec); + extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset); + LOGGER.info("Processing extendBuildTime:{}", requestParams.toString()); SegmentCompletionProtocol.Response response = _segmentCompletionManager.extendBuildTime(requestParams); @@ -103,23 +107,39 @@ public class LLCSegmentCompletionHandlers { return responseStr; } + private void extractOffsetFromParams(SegmentCompletionProtocol.Request.Params requestParams, + @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset, + @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset) { + // If the sender sent us a stream partition message offset, use it. If not, the sender is still old + // version, so pick up the old offset from it. + // TODO Remove this backup use of offset when server and controller are upgraded. + if (streamPartitionMsgOffset != null) { + requestParams.withStreamPartitionMsgOffset(new StreamPartitionMsgOffset(streamPartitionMsgOffset)); + } else { + requestParams.withOffset(offset); + } + } + @GET @Path(SegmentCompletionProtocol.MSG_TYPE_CONSUMED) @Produces(MediaType.APPLICATION_JSON) public String segmentConsumed(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName, @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, + @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset, @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason, @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes, @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows) { - if (instanceId == null || segmentName == null || offset == -1) { - LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}", offset, segmentName, instanceId); + if (instanceId == null || segmentName == null || (offset == -1 && streamPartitionMsgOffset == null)) { + LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, streamPartitionMsgOffset={}", + offset, segmentName, instanceId, streamPartitionMsgOffset); return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); } SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params(); - requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset).withReason(stopReason) + requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withReason(stopReason) .withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows); + extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset); LOGGER.info("Processing segmentConsumed:{}", requestParams.toString()); SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentConsumed(requestParams); @@ -134,14 +154,17 @@ public class LLCSegmentCompletionHandlers { public String segmentStoppedConsuming(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName, @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, + @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset, @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason) { - if (instanceId == null || segmentName == null || offset == -1) { - LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}", offset, segmentName, instanceId); + if (instanceId == null || segmentName == null || (offset == -1 && streamPartitionMsgOffset == null)) { + LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, streamPartitionMsgOffset={}", + offset, segmentName, instanceId, streamPartitionMsgOffset); return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); } SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params(); - requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset).withReason(stopReason); + requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withReason(stopReason); + extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset); LOGGER.info("Processing segmentStoppedConsuming:{}", requestParams.toString()); SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentStoppedConsuming(requestParams); @@ -156,20 +179,25 @@ public class LLCSegmentCompletionHandlers { public String segmentCommitStart(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName, @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, + @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset, @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes, @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long buildTimeMillis, @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis, @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes) { - if (instanceId == null || segmentName == null || offset == -1) { + + if (instanceId == null || segmentName == null || (offset == -1 && streamPartitionMsgOffset == null)) { + LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, streamPartitionMsgOffset={}", + offset, segmentName, instanceId, streamPartitionMsgOffset); LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}", offset, segmentName, instanceId); return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); } SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params(); - requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset) - .withMemoryUsedBytes(memoryUsedBytes).withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis) - .withNumRows(numRows).withSegmentSizeBytes(segmentSizeBytes); + requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withMemoryUsedBytes(memoryUsedBytes) + .withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows) + .withSegmentSizeBytes(segmentSizeBytes); + extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset); LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString()); @@ -187,14 +215,17 @@ public class LLCSegmentCompletionHandlers { @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String segmentLocation, @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, + @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset, @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes, @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long buildTimeMillis, @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis, @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes) { - if (instanceId == null || segmentName == null || offset == -1 || segmentLocation == null) { - LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, segmentLocation={}", offset, segmentName, - instanceId, segmentLocation); + if (instanceId == null || segmentName == null + || segmentLocation == null + || (offset == -1 && streamPartitionMsgOffset == null)) { + LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, segmentLocation={}, streamPartitionMsgOffset={}", + offset, segmentName, instanceId, segmentLocation, streamPartitionMsgOffset); // TODO: memoryUsedInBytes = 0 if not present in params. Add validation when we start using it return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); } @@ -208,10 +239,11 @@ public class LLCSegmentCompletionHandlers { return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); } SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params(); - requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset) + requestParams.withInstanceId(instanceId).withSegmentName(segmentName) .withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes) .withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows) .withMemoryUsedBytes(memoryUsedBytes); + extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset); LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString()); final boolean isSuccess = true; @@ -233,15 +265,18 @@ public class LLCSegmentCompletionHandlers { public String segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName, @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, + @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset, @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes, @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long buildTimeMillis, @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes, @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, FormDataMultiPart multiPart) { + SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params(); - requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset) + requestParams.withInstanceId(instanceId).withSegmentName(segmentName) .withSegmentSizeBytes(segmentSizeBytes).withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis) .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes); + extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset); LOGGER.info("Processing segmentCommit:{}", requestParams.toString()); final SegmentCompletionManager segmentCompletionManager = _segmentCompletionManager; @@ -296,7 +331,7 @@ public class LLCSegmentCompletionHandlers { response = segmentCompletionManager.segmentCommitEnd(requestParams, success, false, committingSegmentDescriptor); LOGGER.info("Response to segmentCommit: instance={} segment={} status={} offset={}", requestParams.getInstanceId(), - requestParams.getSegmentName(), response.getStatus(), response.getOffset()); + requestParams.getSegmentName(), response.getStatus(), response.extractOffset()); return response.toJsonString(); } @@ -310,9 +345,12 @@ public class LLCSegmentCompletionHandlers { @Consumes(MediaType.MULTIPART_FORM_DATA) public String segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName, - @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, FormDataMultiPart multiPart) { + @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, + @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset, + FormDataMultiPart multiPart) { SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params(); - requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset); + requestParams.withInstanceId(instanceId).withSegmentName(segmentName); + extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset); LOGGER.info("Processing segmentUpload:{}", requestParams.toString()); // Get the segment from the form input and put it into the data directory (could be remote) @@ -349,24 +387,27 @@ public class LLCSegmentCompletionHandlers { @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String segmentLocation, @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, + @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset, @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes, @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long buildTimeMillis, @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis, @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes, FormDataMultiPart metadataFiles) { - if (instanceId == null || segmentName == null || offset == -1 || segmentLocation == null || metadataFiles == null) { - LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, segmentLocation={}", offset, segmentName, - instanceId, segmentLocation); + if (instanceId == null || segmentName == null || segmentLocation == null || metadataFiles == null + || (offset == -1 && streamPartitionMsgOffset == null)) { + LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, segmentLocation={}, streamPartitionMsgOffset={}", + offset, segmentName, instanceId, segmentLocation, streamPartitionMsgOffset); // TODO: memoryUsedInBytes = 0 if not present in params. Add validation when we start using it return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); } SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params(); - requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset) + requestParams.withInstanceId(instanceId).withSegmentName(segmentName) .withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes) .withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows) .withMemoryUsedBytes(memoryUsedBytes); + extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset); LOGGER.info("Processing segmentCommitEndWithMetadata:{}", requestParams.toString()); SegmentMetadataImpl segmentMetadata; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java index 15a443e..72486ba 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java @@ -47,7 +47,7 @@ public class SegmentCompletionProtocolDeserTest { SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(params); assertEquals(response.getBuildTimeSeconds(), BUILD_TIME_MILLIS); - assertEquals(response.getStreamPartitionMsgOffset().compareTo(OFFSET), 0); + assertEquals(new StreamPartitionMsgOffset(response.getStreamPartitionMsgOffset()).compareTo(OFFSET), 0); assertEquals(response.getSegmentLocation(), SEGMENT_LOCATION); assertTrue(response.isSplitCommit()); assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); @@ -63,7 +63,7 @@ public class SegmentCompletionProtocolDeserTest { SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(params); assertEquals(response.getBuildTimeSeconds(), BUILD_TIME_MILLIS); - assertEquals(response.getStreamPartitionMsgOffset().compareTo(OFFSET), 0); + assertEquals(new StreamPartitionMsgOffset(response.getStreamPartitionMsgOffset()).compareTo(OFFSET), 0); assertNull(response.getSegmentLocation()); assertFalse(response.isSplitCommit()); assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java index 3bd88da..d8dda56 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java @@ -33,6 +33,7 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.zookeeper.data.Stat; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -54,9 +55,9 @@ public class SegmentCompletionTest { private final String s2 = "S2"; private final String s3 = "S3"; - private final long s1Offset = 20L; - private final long s2Offset = 40L; - private final long s3Offset = 30L; + private final StreamPartitionMsgOffset s1Offset = new StreamPartitionMsgOffset(20L); + private final StreamPartitionMsgOffset s2Offset = new StreamPartitionMsgOffset(40L); + private final StreamPartitionMsgOffset s3Offset = new StreamPartitionMsgOffset(30L); @BeforeMethod public void testCaseSetup() @@ -64,6 +65,14 @@ public class SegmentCompletionTest { testCaseSetup(true, true); } + private void verifyOffset(SegmentCompletionProtocol.Response response, StreamPartitionMsgOffset offset) { + Assert.assertEquals(new StreamPartitionMsgOffset(response.getStreamPartitionMsgOffset()).compareTo(offset), 0); + // Compatibility test: + // The controller must always respond with BOTH fields -- 'offset' as well as 'streamPartitionMsgOffset', and they + // should be the same value. + Assert.assertEquals(response.getOffset(), offset.getOffset()); + } + public void testCaseSetup(boolean isLeader, boolean isConnected) throws Exception { PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class); @@ -114,18 +123,18 @@ public class SegmentCompletionTest { // s1 sends offset of 20, gets HOLD at t = 5s; segmentCompletionMgr._seconds = 5; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 30, gets catchup to 40 segmentCompletionMgr._seconds += 1; params = - new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr).withReason(reason); + new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr).withReason(reason); response = segmentCompletionMgr.segmentStoppedConsuming(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED); Assert.assertEquals(new LLCSegmentName(segmentNameStr), segmentManager._stoppedSegmentName); @@ -135,27 +144,27 @@ public class SegmentCompletionTest { // Now s1 comes back, and is asked to catchup. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); // s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); // s3 comes back with new caught up offset, it should get a HOLD, since commit is not done yet. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 executes a successful commit segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); @@ -165,7 +174,7 @@ public class SegmentCompletionTest { // Now if s3 or s1 come back, they are asked to keep the segment they have. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.KEEP); @@ -182,7 +191,7 @@ public class SegmentCompletionTest { // s1 stops consuming at t = 5; segmentCompletionMgr._seconds = 5; params = - new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr).withReason(reason); + new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr).withReason(reason); response = segmentCompletionMgr.segmentStoppedConsuming(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED); Assert.assertEquals(new LLCSegmentName(segmentNameStr), segmentManager._stoppedSegmentName); @@ -192,40 +201,40 @@ public class SegmentCompletionTest { // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 30, gets catchup to 40, s2 should have been decided as the winner now // since we are never expecting to hear back from s1 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // s3 happens to come back (after catchup to s2offset) before s2 should get a hold since s2 has been decided as // the winner. // TODO Can optimize here since s2 is not notified yet. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); // s2 executes a successful commit segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); @@ -235,7 +244,7 @@ public class SegmentCompletionTest { // Now if s3 or s1 come back, they are asked to keep the segment they have. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.KEEP); @@ -251,7 +260,7 @@ public class SegmentCompletionTest { Request.Params params; segmentCompletionMgr._seconds = 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withReason("some reason"); response = segmentCompletionMgr.segmentStoppedConsuming(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED); @@ -284,7 +293,7 @@ public class SegmentCompletionTest { SegmentCompletionProtocol.Response response; Request.Params params; segmentCompletionMgr._seconds = 10; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.FAILED); } @@ -297,46 +306,46 @@ public class SegmentCompletionTest { Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; segmentCompletionMgr._seconds = 5L; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 30, gets catchup to 40 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // Now s1 comes back, and is asked to catchup. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); // TODO: Verify controller asked to do a split commit Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); // s3 comes back with new caught up offset, it should get a HOLD, since commit is not done yet. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 executes a successful commit start segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); // s2's file does not successfully commit because MockPinotLLCRealtimeSegmentManager.commitSegmentFile() returns // false when detecting SegmentLocation == "doNotCommitMe"; segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withSegmentLocation("doNotCommitMe"); response = segmentCompletionMgr .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); @@ -347,30 +356,30 @@ public class SegmentCompletionTest { // Now s1 comes back; it is asked to hold. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // Now s3 comes back; it is asked to commit segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT); // Now s2 comes back; it is asked to hold segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD); // s3 executes a successful commit start segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); // s3's file successfully commits segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withSegmentLocation("location"); response = segmentCompletionMgr .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); @@ -385,45 +394,45 @@ public class SegmentCompletionTest { Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; segmentCompletionMgr._seconds = startTime; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 30, gets catchup to 40 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // Now s1 comes back, and is asked to catchup. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); // TODO: Verify controller asked to do a split commit Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); // s3 comes back with new caught up offset, it should get a HOLD, since commit is not done yet. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 executes a successful commit segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withSegmentLocation("location"); response = segmentCompletionMgr .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); @@ -434,7 +443,7 @@ public class SegmentCompletionTest { // Now if s3 or s1 come back, they are asked to keep the segment they have. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.KEEP); @@ -450,30 +459,30 @@ public class SegmentCompletionTest { Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; segmentCompletionMgr._seconds = 5L; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 20, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 20, gets commit segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT); - Assert.assertEquals(response.getOffset(), s1Offset); + verifyOffset(response, s1Offset); // s3 sends a commit start with 20 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); // s3 comes back to try to commit with a different offset segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr) .withSegmentLocation("location"); response = segmentCompletionMgr .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); @@ -484,7 +493,7 @@ public class SegmentCompletionTest { // Now if s2 or s1 come back, they are asked to hold. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); @@ -498,43 +507,43 @@ public class SegmentCompletionTest { Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; segmentCompletionMgr._seconds = startTime; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 30, gets catchup to 40 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // Now s1 comes back, and is asked to catchup. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); // s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); // s3 comes back with new caught up offset, it should get a HOLD, since commit is not done yet. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 executes a successful commit segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); @@ -544,7 +553,7 @@ public class SegmentCompletionTest { // Now if s3 or s1 come back, they are asked to keep the segment they have. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.KEEP); @@ -560,7 +569,7 @@ public class SegmentCompletionTest { Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; segmentCompletionMgr._seconds = 5L; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr) .withReason("rowLimit"); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.NOT_LEADER); @@ -572,7 +581,7 @@ public class SegmentCompletionTest { SegmentCompletionProtocol.Response response; Request.Params params; segmentCompletionMgr._seconds = 10L; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); @@ -584,40 +593,40 @@ public class SegmentCompletionTest { SegmentCompletionProtocol.Response response; Request.Params params; segmentCompletionMgr._seconds = 10L; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT); // S2 comes with the same offset as S1 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD); segmentCompletionMgr._seconds += 1; // S3 comes with a different offset and without row limit. we ask it to hold even though it is higher. - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD); // S1 comes back to commit the segment segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // We ask S2 to keep the segment - params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.KEEP); // And we ask S3 to discard because it was ahead. - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.DISCARD); @@ -643,37 +652,38 @@ public class SegmentCompletionTest { // s1 sends offset of 20, gets HOLD at t = 5s; final int startTimeSecs = 5; segmentCompletionMgr._seconds = startTimeSecs; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // Now s1 comes back again, and is asked to hold segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 is asked to commit. segmentCompletionMgr._seconds += SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 1000; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); // Now s3 comes up with a better offset, but we ask it to hold, since the committer has not committed yet. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset + 10).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withSegmentName(segmentNameStr); + params.withStreamPartitionMsgOffset(new StreamPartitionMsgOffset(s2Offset.getOffset() + 10)); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 commits. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withSegmentLocation("location"); response = segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); @@ -683,7 +693,8 @@ public class SegmentCompletionTest { // Now s3 comes back to get a discard. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset + 10).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withSegmentName(segmentNameStr); + params.withStreamPartitionMsgOffset(new StreamPartitionMsgOffset(s2Offset.getOffset() + 10)); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.DISCARD); // Now the FSM should have disappeared from the map @@ -699,42 +710,42 @@ public class SegmentCompletionTest { // s1 sends offset of 20, gets HOLD at t = 5s; final int startTimeSecs = 5; segmentCompletionMgr._seconds = startTimeSecs; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 is asked to catch up. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // Now s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); // All servers are dead segmentCompletionMgr._seconds += 3600; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); Assert.assertFalse(fsmMap.containsKey(segmentNameStr)); - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD); Assert.assertTrue(fsmMap.containsKey(segmentNameStr)); // Now s2 is asked to commit because the max time to pick committer has passed. segmentCompletionMgr._seconds += 4; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT); } @@ -748,23 +759,23 @@ public class SegmentCompletionTest { // s1 sends offset of 20, gets HOLD at t = 5s; final int startTimeSecs = 5; segmentCompletionMgr._seconds = startTimeSecs; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 is asked to hold. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // Now s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); @@ -772,7 +783,7 @@ public class SegmentCompletionTest { segmentCompletionMgr._seconds += SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 1000; // But since s1 and s3 are in HOLDING state, they should come back again. s1 is asked to catchup - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); @@ -782,7 +793,7 @@ public class SegmentCompletionTest { // s1 comes back with the updated offset, since it was asked to catch up. // The FSM will be aborted, and destroyed ... - params = new Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); @@ -790,19 +801,19 @@ public class SegmentCompletionTest { // s1 comes back again, a new FSM created segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 comes back segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // And winner chosen when the last one does not come back at all segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); @@ -820,28 +831,28 @@ public class SegmentCompletionTest { final String tableName = new LLCSegmentName(segmentNameStr).getTableName(); Assert.assertNull(commitTimeMap.get(tableName)); segmentCompletionMgr._seconds = startTime; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 30, gets catchup to 40 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // Now s1 comes back, and is asked to catchup. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); // s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); long commitTimeSec = response.getBuildTimeSeconds(); @@ -849,7 +860,7 @@ public class SegmentCompletionTest { // Fast forward to one second before commit time, and send a lease renewal request for 20s segmentCompletionMgr._seconds = startTime + commitTimeSec - 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withExtraTimeSec(20); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED); @@ -857,7 +868,7 @@ public class SegmentCompletionTest { // Another lease extension in 19s. segmentCompletionMgr._seconds += 19; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withExtraTimeSec(20); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED); @@ -865,7 +876,7 @@ public class SegmentCompletionTest { // Commit in 15s segmentCompletionMgr._seconds += 15; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); long commitTimeMs = (segmentCompletionMgr._seconds - startTime) * 1000; @@ -887,28 +898,28 @@ public class SegmentCompletionTest { // s1 sends offset of 20, gets HOLD at t = 5s; final long startTime = 5; segmentCompletionMgr._seconds = startTime; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 30, gets catchup to 40 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // Now s1 comes back, and is asked to catchup. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); // s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); long commitTimeSec = response.getBuildTimeSeconds(); @@ -916,7 +927,7 @@ public class SegmentCompletionTest { // Fast forward to one second before commit time, and send a lease renewal request for 20s segmentCompletionMgr._seconds = startTime + commitTimeSec - 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withExtraTimeSec(20); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED); @@ -924,7 +935,7 @@ public class SegmentCompletionTest { // Come back too late. segmentCompletionMgr._seconds += 25; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD); // now FSM should be out of the map. @@ -940,28 +951,28 @@ public class SegmentCompletionTest { // s1 sends offset of 20, gets HOLD at t = 5s; final long startTime = 5; segmentCompletionMgr._seconds = startTime; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 30, gets catchup to 40 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // Now s1 comes back, and is asked to catchup. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); // s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); long commitTimeSec = response.getBuildTimeSeconds(); @@ -969,7 +980,7 @@ public class SegmentCompletionTest { // Fast forward to one second before commit time, and send a lease renewal request for 20s segmentCompletionMgr._seconds = startTime + commitTimeSec - 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withExtraTimeSec(20); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED); @@ -979,7 +990,7 @@ public class SegmentCompletionTest { // Lease will not be granted if the time taken so far plus lease time exceeds the max allowabale. while (segmentCompletionMgr._seconds + leaseTimeSec <= startTime + SegmentCompletionManager .getMaxCommitTimeForAllSegmentsSeconds()) { - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withExtraTimeSec(leaseTimeSec); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED); @@ -988,7 +999,7 @@ public class SegmentCompletionTest { } // Now the lease request should fail. - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withExtraTimeSec(leaseTimeSec); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.FAILED); @@ -1002,28 +1013,28 @@ public class SegmentCompletionTest { Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; segmentCompletionMgr._seconds = 5; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 30, gets catchup to 40 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // Now s1 comes back, and is asked to catchup. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); // s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); @@ -1031,24 +1042,24 @@ public class SegmentCompletionTest { replaceSegmentCompletionManager(); // s3 comes back with the correct offset but is asked to hold. - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s1 comes back, and still asked to hold. - params = new Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 has no idea the controller failed, so it comes back with a commit,but the controller asks it to hold, // (essentially a commit failure) segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)); // So s2 goes back into HOLDING state. s1 and s3 are already holding, so now it will get COMMIT back. - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); } @@ -1062,28 +1073,28 @@ public class SegmentCompletionTest { Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; segmentCompletionMgr._seconds = 5; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 sends offset of 40, gets HOLD segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s3 sends offset of 30, gets catchup to 40 segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s3Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - Assert.assertEquals(response.getOffset(), s2Offset); + verifyOffset(response, s2Offset); // Now s1 comes back, and is asked to catchup. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); // s2 is asked to commit. segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); @@ -1091,24 +1102,24 @@ public class SegmentCompletionTest { replaceSegmentCompletionManager(); // s3 comes back with the correct offset but is asked to hold. - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s3).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s1 comes back, and still asked to hold. - params = new Request.Params().withInstanceId(s1).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); // s2 has no idea the controller failed, so it comes back with a commit,but the controller asks it to hold, // (essentially a commit failure) segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)); // So s2 goes back into HOLDING state. s1 and s3 are already holding, so now it will get COMMIT back. - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2).withStreamPartitionMsgOffset(s2Offset).withSegmentName(segmentNameStr) .withSegmentLocation("location"); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); @@ -1121,11 +1132,11 @@ public class SegmentCompletionTest { SegmentCompletionProtocol.Response response; SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params(); - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER); - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); + params = new Request.Params().withInstanceId(s1).withStreamPartitionMsgOffset(s1Offset).withSegmentName(segmentNameStr); response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index d13344a..95633d5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -536,7 +536,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _state = State.HOLDING; SegmentCompletionProtocol.Response response = postSegmentConsumedMsg(); SegmentCompletionProtocol.ControllerResponseStatus status = response.getStatus(); - StreamPartitionMsgOffset rspOffset = new StreamPartitionMsgOffset(response.getOffset()); + StreamPartitionMsgOffset rspOffset = response.extractOffset(); boolean success; switch (status) { case NOT_LEADER: diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index a682c85..3e3020a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -41,6 +41,7 @@ import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder; +import org.apache.pinot.core.segment.creator.impl.V1Constants; import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -69,7 +70,8 @@ public class LLRealtimeSegmentDataManagerTest { private static final LLCSegmentName _segmentName = new LLCSegmentName(_tableName, _partitionId, _sequenceId, _segTimeMs); private static final String _segmentNameStr = _segmentName.getSegmentName(); - private static final long _startOffset = 19885L; + private static final long _startOffsetValue = 19885L; + private static final StreamPartitionMsgOffset _startOffset = new StreamPartitionMsgOffset(_startOffsetValue); private static final String _topicName = "someTopic"; private static final int maxRowsInSegment = 250000; private static final long maxTimeForSegmentCloseMs = 64368000L; @@ -134,7 +136,7 @@ public class LLRealtimeSegmentDataManagerTest { LLCRealtimeSegmentZKMetadata segmentZKMetadata = new LLCRealtimeSegmentZKMetadata(); segmentZKMetadata.setSegmentName(_segmentNameStr); - segmentZKMetadata.setStartOffset(_startOffset); + segmentZKMetadata.setStartOffset(_startOffset.getOffset()); segmentZKMetadata.setCreationTime(System.currentTimeMillis()); return segmentZKMetadata; } @@ -173,12 +175,12 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); - final long endOffset = _startOffset + 500; + final StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(_startOffsetValue + 500); // We should consume initially... segmentDataManager._consumeOffsets.add(endOffset); final SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response( new SegmentCompletionProtocol.Response.Params() - .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withOffset(endOffset)); + .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withStreamPartitionMsgOffset(endOffset)); // And then never consume as long as we get a hold response, 100 times. for (int i = 0; i < 100; i++) { segmentDataManager._responses.add(response); @@ -202,14 +204,14 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); - final long endOffset = _startOffset + 500; + final StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(_startOffsetValue + 500); // We should consume initially... segmentDataManager._consumeOffsets.add(endOffset); final SegmentCompletionProtocol.Response holdResponse = new SegmentCompletionProtocol.Response( - new SegmentCompletionProtocol.Response.Params().withOffset(endOffset) + new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(endOffset) .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)); final SegmentCompletionProtocol.Response commitResponse = new SegmentCompletionProtocol.Response( - new SegmentCompletionProtocol.Response.Params().withOffset(endOffset) + new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(endOffset) .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)); // And then never consume as long as we get a hold response, 100 times. segmentDataManager._responses.add(holdResponse); @@ -233,11 +235,11 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); - final long endOffset = _startOffset + 500; + final StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(_startOffsetValue + 500); // We should consume initially... segmentDataManager._consumeOffsets.add(endOffset); final SegmentCompletionProtocol.Response commitResponse = new SegmentCompletionProtocol.Response( - new SegmentCompletionProtocol.Response.Params().withOffset(endOffset) + new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(endOffset) .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)); segmentDataManager._responses.add(commitResponse); segmentDataManager._failSegmentBuild = true; @@ -254,23 +256,24 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); - final long firstOffset = _startOffset + 500; - final long catchupOffset = firstOffset + 10; + final StreamPartitionMsgOffset firstOffset = new StreamPartitionMsgOffset(_startOffsetValue + 500); + final StreamPartitionMsgOffset catchupOffset = new StreamPartitionMsgOffset(firstOffset.getOffset() + 10); // We should consume initially... segmentDataManager._consumeOffsets.add(firstOffset); segmentDataManager._consumeOffsets.add(catchupOffset); // Offset after catchup final SegmentCompletionProtocol.Response holdResponse1 = new SegmentCompletionProtocol.Response( new SegmentCompletionProtocol.Response.Params() .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD). - withOffset(firstOffset)); + withStreamPartitionMsgOffset(firstOffset)); final SegmentCompletionProtocol.Response catchupResponse = new SegmentCompletionProtocol.Response( new SegmentCompletionProtocol.Response.Params() - .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP).withOffset(catchupOffset)); + .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP) + .withStreamPartitionMsgOffset(catchupOffset)); final SegmentCompletionProtocol.Response holdResponse2 = new SegmentCompletionProtocol.Response( - new SegmentCompletionProtocol.Response.Params().withOffset(catchupOffset) + new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(catchupOffset) .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)); final SegmentCompletionProtocol.Response commitResponse = new SegmentCompletionProtocol.Response( - new SegmentCompletionProtocol.Response.Params().withOffset(catchupOffset) + new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(catchupOffset) .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)); // And then never consume as long as we get a hold response, 100 times. segmentDataManager._responses.add(holdResponse1); @@ -296,10 +299,10 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); - final long endOffset = _startOffset + 500; + final StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(_startOffsetValue + 500); segmentDataManager._consumeOffsets.add(endOffset); final SegmentCompletionProtocol.Response discardResponse = new SegmentCompletionProtocol.Response( - new SegmentCompletionProtocol.Response.Params().withOffset(endOffset) + new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(endOffset) .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.DISCARD)); segmentDataManager._responses.add(discardResponse); @@ -321,10 +324,10 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); - final long endOffset = _startOffset + 500; + final StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(_startOffsetValue + 500); segmentDataManager._consumeOffsets.add(endOffset); SegmentCompletionProtocol.Response.Params params = new SegmentCompletionProtocol.Response.Params(); - params.withOffset(endOffset).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP); + params.withStreamPartitionMsgOffset(endOffset).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP); final SegmentCompletionProtocol.Response keepResponse = new SegmentCompletionProtocol.Response(params); segmentDataManager._responses.add(keepResponse); @@ -345,11 +348,11 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer(); - final long endOffset = _startOffset + 500; + final StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(_startOffsetValue + 500); // We should consume initially... segmentDataManager._consumeOffsets.add(endOffset); final SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response( - new SegmentCompletionProtocol.Response.Params().withOffset(endOffset) + new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(endOffset) .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)); // And then never consume as long as we get a Not leader response, 100 times. for (int i = 0; i < 100; i++) { @@ -389,8 +392,9 @@ public class LLRealtimeSegmentDataManagerTest { public void testOnlineTransitionAfterStop() throws Exception { LLCRealtimeSegmentZKMetadata metadata = new LLCRealtimeSegmentZKMetadata(); - final long finalOffset = _startOffset + 600; - metadata.setEndOffset(finalOffset); + final long finalOffsetValue = _startOffsetValue + 600; + final StreamPartitionMsgOffset finalOffset = new StreamPartitionMsgOffset(finalOffsetValue); + metadata.setEndOffset(finalOffset.getOffset()); { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); @@ -437,7 +441,7 @@ public class LLRealtimeSegmentDataManagerTest { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.HOLDING); - segmentDataManager.setCurrentOffset(finalOffset + 1); + segmentDataManager.setCurrentOffset(finalOffsetValue + 1); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled); Assert.assertFalse(segmentDataManager._buildAndReplaceCalled); @@ -449,7 +453,7 @@ public class LLRealtimeSegmentDataManagerTest { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP); - segmentDataManager.setCurrentOffset(finalOffset + 1); + segmentDataManager.setCurrentOffset(finalOffsetValue + 1); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled); Assert.assertFalse(segmentDataManager._buildAndReplaceCalled); @@ -461,7 +465,7 @@ public class LLRealtimeSegmentDataManagerTest { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._stopWaitTimeMs = 0; segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP); - segmentDataManager._consumeOffsets.add(finalOffset - 1); + segmentDataManager._consumeOffsets.add(new StreamPartitionMsgOffset(finalOffsetValue - 1)); segmentDataManager.goOnlineFromConsuming(metadata); Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled); Assert.assertFalse(segmentDataManager._buildAndReplaceCalled); @@ -515,7 +519,7 @@ public class LLRealtimeSegmentDataManagerTest { { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP); - final long finalOffset = _startOffset + 100; + final long finalOffset = _startOffsetValue + 100; segmentDataManager.setFinalOffset(finalOffset); segmentDataManager.setCurrentOffset(finalOffset - 1); Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached()); @@ -528,7 +532,7 @@ public class LLRealtimeSegmentDataManagerTest { FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(); _timeNow += maxTimeForSegmentCloseMs; segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP); - final long finalOffset = _startOffset + 100; + final long finalOffset = _startOffsetValue + 100; segmentDataManager.setFinalOffset(finalOffset); segmentDataManager.setCurrentOffset(finalOffset - 1); Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached()); @@ -543,7 +547,7 @@ public class LLRealtimeSegmentDataManagerTest { _timeNow += 1; segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE); segmentDataManager.setConsumeEndTime(_timeNow + 10); - final long finalOffset = _startOffset + 100; + final long finalOffset = _startOffsetValue + 100; segmentDataManager.setFinalOffset(finalOffset); segmentDataManager.setCurrentOffset(finalOffset - 1); Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached()); @@ -557,7 +561,7 @@ public class LLRealtimeSegmentDataManagerTest { segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE); final long endTime = _timeNow + 10; segmentDataManager.setConsumeEndTime(endTime); - final long finalOffset = _startOffset + 100; + final long finalOffset = _startOffsetValue + 100; segmentDataManager.setFinalOffset(finalOffset); segmentDataManager.setCurrentOffset(finalOffset - 1); _timeNow = endTime - 1; @@ -627,7 +631,7 @@ public class LLRealtimeSegmentDataManagerTest { // Set up the responses so that we get a failed response first and then a success response. segmentDataManager._responses.add(commitFailed); final long leaseTime = 50000L; - final long finalOffset = _startOffset + 600; + final long finalOffset = _startOffsetValue + 600; segmentDataManager.setCurrentOffset(finalOffset); // We have set up commit to fail, so we should carry over the segment file. @@ -703,7 +707,7 @@ public class LLRealtimeSegmentDataManagerTest { public Field _state; public Field _shouldStop; public Field _stopReason; - public LinkedList<Long> _consumeOffsets = new LinkedList<>(); + public LinkedList<StreamPartitionMsgOffset> _consumeOffsets = new LinkedList<>(); public LinkedList<SegmentCompletionProtocol.Response> _responses = new LinkedList<>(); public boolean _commitSegmentCalled = false; public boolean _buildSegmentCalled = false; @@ -787,7 +791,7 @@ public class LLRealtimeSegmentDataManagerTest { if (_throwExceptionFromConsume) { throw new PermanentConsumerException(new Throwable("Offset out of range")); } - setCurrentOffset(_consumeOffsets.remove()); + setCurrentOffset(_consumeOffsets.remove().getOffset()); terminateLoopIfNecessary(); return true; } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java index a03ca98..d398d9f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java @@ -44,6 +44,7 @@ import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager; import org.apache.pinot.server.realtime.ControllerLeaderLocator; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.testng.Assert; @@ -142,8 +143,8 @@ public class SegmentCompletionIntegrationTest extends BaseClusterIntegrationTest ServerSegmentCompletionProtocolHandler protocolHandler = new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()), realtimeTableName); SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params(); - params.withOffset(45688L).withSegmentName(_currentSegment).withReason("RandomReason") - .withInstanceId(_serverInstance); + params.withStreamPartitionMsgOffset(new StreamPartitionMsgOffset(45688L)).withSegmentName(_currentSegment) + .withReason("RandomReason") .withInstanceId(_serverInstance); SegmentCompletionProtocol.Response response = protocolHandler.segmentStoppedConsuming(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java index e571b91..c795448 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java @@ -42,6 +42,15 @@ public class StreamPartitionMsgOffset implements Comparable { _offset = offset; } + /** + * This constructor will go away when this class becomes an interface + * @param offsetStr + */ + @Deprecated + public StreamPartitionMsgOffset(String offsetStr) { + _offset = Long.parseLong(offsetStr); + } + @Deprecated public void setOffset(long offset) { _offset = offset; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org