This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3a616d8ce5 Cleanup non split commit code (#14559) 3a616d8ce5 is described below commit 3a616d8ce525bf9e611cde273373e57a768ce2de Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Nov 29 20:02:35 2024 -0800 Cleanup non split commit code (#14559) --- .../resources/LLCSegmentCompletionHandlers.java | 91 +-------- .../core/realtime/SegmentCompletionManager.java | 40 ++-- .../helix/core/realtime/SegmentCompletionTest.java | 219 ++++++--------------- 3 files changed, 77 insertions(+), 273 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java index 5e5adb0d80..c459436225 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java @@ -53,7 +53,6 @@ import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; @@ -199,94 +198,6 @@ public class LLCSegmentCompletionHandlers { return response; } - // Remove after releasing 1.1 (server always use split commit) - @Deprecated - @POST - @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT) - @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.COMMIT_SEGMENT) - @Authenticate(AccessType.CREATE) - @Consumes(MediaType.MULTIPART_FORM_DATA) - @Produces(MediaType.APPLICATION_JSON) - public String segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId, - @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName, - @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset, - @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes, - @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long buildTimeMillis, - @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis, - @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes, - @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, FormDataMultiPart multiPart) { - SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params() - .withInstanceId(instanceId) - .withSegmentName(segmentName) - .withStreamPartitionMsgOffset(streamPartitionMsgOffset) - .withSegmentSizeBytes(segmentSizeBytes) - .withBuildTimeMillis(buildTimeMillis) - .withWaitTimeMillis(waitTimeMillis) - .withNumRows(numRows) - .withMemoryUsedBytes(memoryUsedBytes); - LOGGER.info("Processing segmentCommit: {}", requestParams); - - SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentCommitStart(requestParams); - - CommittingSegmentDescriptor committingSegmentDescriptor = - CommittingSegmentDescriptor.fromSegmentCompletionReqParams(requestParams); - boolean success = false; - - if (response.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) { - File localTempFile = null; - try { - localTempFile = extractSegmentFromFormToLocalTempFile(multiPart, segmentName); - SegmentMetadataImpl segmentMetadata = extractMetadataFromLocalSegmentFile(localTempFile); - // Store the segment file to Pinot FS. - String rawTableName = new LLCSegmentName(segmentName).getTableName(); - URI segmentFileURI = - URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), rawTableName, - URIUtils.encode(segmentName)); - PinotFS pinotFS = PinotFSFactory.create(segmentFileURI.getScheme()); - // Multiple threads can reach this point at the same time, if the following scenario happens - // The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in - // SegmentCompletionManager timed out, and allowed another server to commit, which did so very quickly (somehow - // the network speeds changed). The second server made it through the FSM and reached this point. - // The synchronization below takes care that exactly one file gets moved in place. - // There are still corner conditions that are not handled correctly. For example, - // 1. What if the offset of the faster server was different? - // 2. We know that only the faster server will get to complete the COMMIT call successfully. But it is possible - // that the race to this statement is won by the slower server, and so the real segment that is in there - // is that - // of the slower server. - // In order to overcome controller restarts after the segment is moved to PinotFS, but before it is - // committed, we DO need to - // check for existing segment file and remove it. So, the block cannot be removed altogether. - // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no - // longer - // be used. - synchronized (SEGMENT_UPLOAD_LOCK) { - if (pinotFS.exists(segmentFileURI)) { - LOGGER.warn("Segment file: {} already exists. Replacing it with segment: {} from instance: {}", - segmentFileURI, segmentName, instanceId); - pinotFS.delete(segmentFileURI, true); - } - pinotFS.copyFromLocalFile(localTempFile, segmentFileURI); - } - committingSegmentDescriptor = - CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata); - committingSegmentDescriptor.setSegmentLocation(segmentFileURI.toString()); - success = true; - } catch (Exception e) { - LOGGER.error("Caught exception while committing segment: {} from instance: {}", segmentName, instanceId, e); - } finally { - FileUtils.deleteQuietly(localTempFile); - } - } - - response = _segmentCompletionManager.segmentCommitEnd(requestParams, success, false, committingSegmentDescriptor); - LOGGER.info("Response to segmentCommit: instance={}, segment={}, status={}, streamMsgOffset={}", - requestParams.getInstanceId(), requestParams.getSegmentName(), response.getStatus(), - response.getStreamPartitionMsgOffset()); - - return response.toJsonString(); - } - // This method may be called in any controller, leader or non-leader. It is used only when the server decides to use // split commit protocol for the segment commit. // TODO: remove this API. Should not upload segment via controller @@ -379,7 +290,7 @@ public class LLCSegmentCompletionHandlers { return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); } - String response = _segmentCompletionManager.segmentCommitEnd(requestParams, true, true, + String response = _segmentCompletionManager.segmentCommitEnd(requestParams, CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata)) .toJsonString(); LOGGER.info("Response to segmentCommitEndWithMetadata for segment: {} is: {}", segmentName, response); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 7672addb3b..92a9adc1c0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -276,12 +276,10 @@ public class SegmentCompletionManager { * * It returns a response code to be sent back to the client. * - * If the repsonse code is not COMMIT_SUCCESS, then the caller may remove the segment that has been saved. - * - * @return + * If the response code is not COMMIT_SUCCESS, then the caller may remove the segment that has been saved. */ public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, - boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) { + CommittingSegmentDescriptor committingSegmentDescriptor) { final String segmentNameStr = reqParams.getSegmentName(); final LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); final String tableName = segmentName.getTableName(); @@ -292,7 +290,7 @@ public class SegmentCompletionManager { SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED; try { fsm = lookupOrCreateFsm(segmentName, SegmentCompletionProtocol.MSG_TYPE_COMMIT); - response = fsm.segmentCommitEnd(reqParams, success, isSplitCommit, committingSegmentDescriptor); + response = fsm.segmentCommitEnd(reqParams, committingSegmentDescriptor); } catch (Exception e) { LOGGER.error("Caught exception in segmentCommitEnd for segment {}", segmentNameStr, e); } @@ -602,7 +600,7 @@ public class SegmentCompletionManager { * the _winner. */ public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, - boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) { + CommittingSegmentDescriptor committingSegmentDescriptor) { String instanceId = reqParams.getInstanceId(); StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset()); @@ -619,12 +617,7 @@ public class SegmentCompletionManager { _segmentName.getSegmentName(), _winner, _winningOffset); return abortAndReturnFailed(); } - if (!success) { - _logger.error("Segment upload failed"); - return abortAndReturnFailed(); - } - SegmentCompletionProtocol.Response response = - commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor); + SegmentCompletionProtocol.Response response = commitSegment(reqParams, committingSegmentDescriptor); if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) { return abortAndReturnFailed(); } else { @@ -1028,7 +1021,7 @@ public class SegmentCompletionManager { } private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams, - boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) { + CommittingSegmentDescriptor committingSegmentDescriptor) { String instanceId = reqParams.getInstanceId(); StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset()); @@ -1040,18 +1033,15 @@ public class SegmentCompletionManager { } _logger.info("Committing segment {} at offset {} winner {}", _segmentName.getSegmentName(), offset, instanceId); _state = State.COMMITTING; - // In case of splitCommit, the segment is uploaded to a unique file name indicated by segmentLocation, - // so we need to move the segment file to its permanent location first before committing the metadata. - // The committingSegmentDescriptor is then updated with the permanent segment location to be saved in metadata - // store. - if (isSplitCommit) { - try { - _segmentManager.commitSegmentFile(_realtimeTableName, committingSegmentDescriptor); - } catch (Exception e) { - _logger.error("Caught exception while committing segment file for segment: {}", _segmentName.getSegmentName(), - e); - return SegmentCompletionProtocol.RESP_FAILED; - } + // The segment is uploaded to a unique file name indicated by segmentLocation, so we need to move the segment file + // to its permanent location first before committing the metadata. The committingSegmentDescriptor is then updated + // with the permanent segment location to be saved in metadata store. + try { + _segmentManager.commitSegmentFile(_realtimeTableName, committingSegmentDescriptor); + } catch (Exception e) { + _logger.error("Caught exception while committing segment file for segment: {}", _segmentName.getSegmentName(), + e); + return SegmentCompletionProtocol.RESP_FAILED; } try { // Convert to a controller uri if the segment location uses local file scheme. diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java index 19814928a4..2595d8debf 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.realtime; import com.google.common.base.Preconditions; import java.lang.reflect.Field; import java.util.Map; +import javax.annotation.Nullable; import org.apache.helix.HelixManager; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -125,8 +126,7 @@ public class SegmentCompletionTest { } @Test - public void testStoppedConsumeDuringCompletion() - throws Exception { + public void testStoppedConsumeDuringCompletion() { SegmentCompletionProtocol.Response response; Request.Params params; final String reason = "IAmLazy"; @@ -149,8 +149,8 @@ public class SegmentCompletionTest { .withSegmentName(_segmentNameStr).withReason(reason); response = _segmentCompletionMgr.segmentStoppedConsuming(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED); - Assert.assertEquals(new LLCSegmentName(_segmentNameStr), _segmentManager._stoppedSegmentName); - Assert.assertEquals(S_3, _segmentManager._stoppedInstance); + Assert.assertEquals(_segmentManager._stoppedSegmentName, new LLCSegmentName(_segmentNameStr)); + Assert.assertEquals(_segmentManager._stoppedInstance, S_3); _segmentManager._stoppedSegmentName = null; _segmentManager._stoppedInstance = null; @@ -182,8 +182,8 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr).withSegmentLocation("location"); - response = _segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = _segmentCompletionMgr.segmentCommitEnd(params, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // Now the FSM should have disappeared from the map @@ -201,8 +201,7 @@ public class SegmentCompletionTest { } @Test - public void testStoppedConsumeBeforeHold() - throws Exception { + public void testStoppedConsumeBeforeHold() { SegmentCompletionProtocol.Response response; Request.Params params; final String reason = "IAmLazy"; @@ -212,8 +211,8 @@ public class SegmentCompletionTest { .withSegmentName(_segmentNameStr).withReason(reason); response = _segmentCompletionMgr.segmentStoppedConsuming(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED); - Assert.assertEquals(new LLCSegmentName(_segmentNameStr), _segmentManager._stoppedSegmentName); - Assert.assertEquals(S_1, _segmentManager._stoppedInstance); + Assert.assertEquals(_segmentManager._stoppedSegmentName, new LLCSegmentName(_segmentNameStr)); + Assert.assertEquals(_segmentManager._stoppedInstance, S_1); _segmentManager._stoppedSegmentName = null; _segmentManager._stoppedInstance = null; @@ -259,8 +258,8 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr).withSegmentLocation("location"); - response = _segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = _segmentCompletionMgr.segmentCommitEnd(params, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // Now the FSM should have disappeared from the map @@ -279,8 +278,7 @@ public class SegmentCompletionTest { // s2 sends stoppedConsuming message, but then may have gotten restarted, so eventually we complete the segment. @Test - public void testHappyPathAfterStoppedConsuming() - throws Exception { + public void testHappyPathAfterStoppedConsuming() { SegmentCompletionProtocol.Response response; Request.Params params; _segmentCompletionMgr._seconds = 5; @@ -290,7 +288,7 @@ public class SegmentCompletionTest { response = _segmentCompletionMgr.segmentStoppedConsuming(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED); Assert.assertEquals(new LLCSegmentName(_segmentNameStr), _segmentManager._stoppedSegmentName); - Assert.assertEquals(S_2, _segmentManager._stoppedInstance); + Assert.assertEquals(_segmentManager._stoppedInstance, S_2); _segmentManager._stoppedSegmentName = null; _segmentManager._stoppedInstance = null; @@ -298,28 +296,23 @@ public class SegmentCompletionTest { } @Test - public void testHappyPath() - throws Exception { + public void testHappyPath() { testHappyPath(5L); } - // Tests happy path with split commit protocol @Test - public void testHappyPathSplitCommitWithLocalFS() - throws Exception { - testHappyPathSplitCommit(5L, "/local/file", "http://null:null/segments/" + _tableName + "/" + _segmentNameStr); + public void testHappyPathWithLocalFS() { + testHappyPath(5L, "/local/file", "http://null:null/segments/" + _tableName + "/" + _segmentNameStr); } @Test - public void testHappyPathSplitCommitWithDeepstore() - throws Exception { - testHappyPathSplitCommit(5L, "fakefs:///segment1", "fakefs:///segment1"); + public void testHappyPathWithDeepStore() { + testHappyPath(5L, "fakefs:///segment1", "fakefs:///segment1"); } @Test - public void testHappyPathSplitCommitWithPeerDownloadScheme() - throws Exception { - testHappyPathSplitCommit(5L, CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1", + public void testHappyPathWithPeerDownloadScheme() { + testHappyPath(5L, CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1", CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1"); } @@ -393,8 +386,8 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr).withSegmentLocation("doNotCommitMe"); - response = _segmentCompletionMgr - .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = _segmentCompletionMgr.segmentCommitEnd(params, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED); // Now the FSM should have aborted @@ -431,15 +424,14 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr).withSegmentLocation("location"); - response = _segmentCompletionMgr - .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = _segmentCompletionMgr.segmentCommitEnd(params, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT_SUCCESS); // And the FSM should be removed. Assert.assertFalse(_fsmMap.containsKey(_segmentNameStr)); } - private void testHappyPathSplitCommit(long startTime, String segmentLocation, String downloadURL) - throws Exception { + private void testHappyPath(long startTime, String segmentLocation, @Nullable String downloadURL) { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -473,7 +465,6 @@ public class SegmentCompletionTest { params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr); response = _segmentCompletionMgr.segmentConsumed(params); - // TODO: Verify controller asked to do a split commit Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); // s3 comes back with new caught up offset, it should get a HOLD, since commit is not done yet. _segmentCompletionMgr._seconds += 1; @@ -491,10 +482,12 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr).withSegmentLocation(segmentLocation); - response = _segmentCompletionMgr - .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = _segmentCompletionMgr.segmentCommitEnd(params, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); - Assert.assertEquals(_segmentManager.getSegmentZKMetadata(null, null, null).getDownloadUrl(), downloadURL); + if (downloadURL != null) { + Assert.assertEquals(_segmentManager.getSegmentZKMetadata(null, null, null).getDownloadUrl(), downloadURL); + } // Now the FSM should have disappeared from the map Assert.assertFalse(_fsmMap.containsKey(_segmentNameStr)); @@ -510,10 +503,13 @@ public class SegmentCompletionTest { Assert.assertFalse(_fsmMap.containsKey(_segmentNameStr)); } + private void testHappyPath(long startTime) { + testHappyPath(startTime, "location", null); + } + // Tests that we abort when the server instance comes back with a different offset than it is told to commit with @Test - public void testCommitDifferentOffsetSplitCommit() - throws Exception { + public void testCommitDifferentOffset() { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -547,8 +543,8 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s3Offset.toString()) .withSegmentName(_segmentNameStr).withSegmentLocation("location"); - response = _segmentCompletionMgr - .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = _segmentCompletionMgr.segmentCommitEnd(params, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED); // Now the FSM should have disappeared from the map @@ -565,75 +561,6 @@ public class SegmentCompletionTest { Assert.assertTrue(_fsmMap.containsKey(_segmentNameStr)); } - public void testHappyPath(long startTime) - throws Exception { - SegmentCompletionProtocol.Response response; - Request.Params params; - // s1 sends offset of 20, gets HOLD at t = 5s; - _segmentCompletionMgr._seconds = startTime; - params = new Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString()) - .withSegmentName(_segmentNameStr); - response = _segmentCompletionMgr.segmentConsumed(params); - Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); - // s2 sends offset of 40, gets HOLD - _segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) - .withSegmentName(_segmentNameStr); - response = _segmentCompletionMgr.segmentConsumed(params); - Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); - // s3 sends offset of 30, gets catchup to 40 - _segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s3Offset.toString()) - .withSegmentName(_segmentNameStr); - response = _segmentCompletionMgr.segmentConsumed(params); - Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - verifyOffset(response, _s2Offset); - // Now s1 comes back, and is asked to catchup. - _segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString()) - .withSegmentName(_segmentNameStr); - response = _segmentCompletionMgr.segmentConsumed(params); - Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP); - // s2 is asked to commit. - _segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) - .withSegmentName(_segmentNameStr); - response = _segmentCompletionMgr.segmentConsumed(params); - Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); - // s3 comes back with new caught up offset, it should get a HOLD, since commit is not done yet. - _segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s2Offset.toString()) - .withSegmentName(_segmentNameStr); - response = _segmentCompletionMgr.segmentConsumed(params); - Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); - // s2 executes a successful commit - _segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) - .withSegmentName(_segmentNameStr); - response = _segmentCompletionMgr.segmentCommitStart(params); - Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); - - _segmentCompletionMgr._seconds += 5; - params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) - .withSegmentName(_segmentNameStr).withSegmentLocation("location"); - response = _segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); - Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); - - // Now the FSM should have disappeared from the map - Assert.assertFalse(_fsmMap.containsKey(_segmentNameStr)); - - // Now if s3 or s1 come back, they are asked to keep the segment they have. - _segmentCompletionMgr._seconds += 1; - params = new Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s2Offset.toString()) - .withSegmentName(_segmentNameStr); - response = _segmentCompletionMgr.segmentConsumed(params); - Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.KEEP); - - // And the FSM should be removed. - Assert.assertFalse(_fsmMap.containsKey(_segmentNameStr)); - } - @Test public void testControllerNotConnected() throws Exception { @@ -649,8 +576,7 @@ public class SegmentCompletionTest { } @Test - public void testWinnerOnTimeLimit() - throws Exception { + public void testWinnerOnTimeLimit() { SegmentCompletionProtocol.Response response; Request.Params params; _segmentCompletionMgr._seconds = 10L; @@ -661,8 +587,7 @@ public class SegmentCompletionTest { } @Test - public void testWinnerOnForceCommit() - throws Exception { + public void testWinnerOnForceCommit() { SegmentCompletionProtocol.Response response; Request.Params params; // S1 comes to force commit @@ -714,8 +639,8 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr).withSegmentLocation("location"); - response = _segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = _segmentCompletionMgr.segmentCommitEnd(params, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // S3 comes back at the latest offset @@ -736,8 +661,7 @@ public class SegmentCompletionTest { } @Test - public void testWinnerOnRowLimit() - throws Exception { + public void testWinnerOnRowLimit() { SegmentCompletionProtocol.Response response; Request.Params params; _segmentCompletionMgr._seconds = 10L; @@ -767,8 +691,8 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString()) .withSegmentName(_segmentNameStr).withSegmentLocation("location"); - response = _segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = _segmentCompletionMgr.segmentCommitEnd(params, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // We ask S2 to keep the segment params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s1Offset.toString()) @@ -782,26 +706,12 @@ public class SegmentCompletionTest { Assert.assertEquals(response.getStatus(), ControllerResponseStatus.DISCARD); } - // Tests that when server is delayed(Stalls for a hour), when server comes back, we commit successfully. @Test - public void testDelayedServerSplitCommit() - throws Exception { - testDelayedServer(true); - } - - @Test - public void testDelayedServer() - throws Exception { - testDelayedServer(false); - } - - public void testDelayedServer(boolean isSplitCommit) - throws Exception { + public void testDelayedServer() { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; - final int startTimeSecs = 5; - _segmentCompletionMgr._seconds = startTimeSecs; + _segmentCompletionMgr._seconds = 5; params = new Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString()) .withSegmentName(_segmentNameStr); response = _segmentCompletionMgr.segmentConsumed(params); @@ -840,7 +750,7 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr).withSegmentLocation("location"); - response = _segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit, + response = _segmentCompletionMgr.segmentCommitEnd(params, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // Now the FSM should have disappeared from the map @@ -858,13 +768,11 @@ public class SegmentCompletionTest { // We test the case where all servers go silent after controller asks one of them commit @Test - public void testDeadServers() - throws Exception { + public void testDeadServers() { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; - final int startTimeSecs = 5; - _segmentCompletionMgr._seconds = startTimeSecs; + _segmentCompletionMgr._seconds = 5; params = new Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString()) .withSegmentName(_segmentNameStr); response = _segmentCompletionMgr.segmentConsumed(params); @@ -914,13 +822,11 @@ public class SegmentCompletionTest { // We test the case when the committer is asked to commit, but they never come back. @Test - public void testCommitterFailure() - throws Exception { + public void testCommitterFailure() { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; - final int startTimeSecs = 5; - _segmentCompletionMgr._seconds = startTimeSecs; + _segmentCompletionMgr._seconds = 5; params = new Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString()) .withSegmentName(_segmentNameStr); response = _segmentCompletionMgr.segmentConsumed(params); @@ -993,8 +899,7 @@ public class SegmentCompletionTest { } @Test - public void testHappyPathSlowCommit() - throws Exception { + public void testHappyPathSlowCommit() { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 1509242466s; @@ -1059,16 +964,15 @@ public class SegmentCompletionTest { long commitTimeMs = (_segmentCompletionMgr._seconds - startTime) * 1000; Assert.assertEquals(_commitTimeMap.get(tableName).longValue(), commitTimeMs); _segmentCompletionMgr._seconds += 55; - response = _segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = _segmentCompletionMgr.segmentCommitEnd(params, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // now FSM should be out of the map. Assert.assertFalse((_fsmMap.containsKey(_segmentNameStr))); } @Test - public void testFailedSlowCommit() - throws Exception { + public void testFailedSlowCommit() { SegmentCompletionProtocol.Response response; Request.Params params; final String tableName = new LLCSegmentName(_segmentNameStr).getTableName(); @@ -1127,8 +1031,7 @@ public class SegmentCompletionTest { } @Test - public void testLeaseTooLong() - throws Exception { + public void testLeaseTooLong() { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -1176,8 +1079,8 @@ public class SegmentCompletionTest { final int leaseTimeSec = 20; // Lease will not be granted if the time taken so far plus lease time exceeds the max allowabale. - while (_segmentCompletionMgr._seconds + leaseTimeSec <= startTime + SegmentCompletionManager - .getMaxCommitTimeForAllSegmentsSeconds()) { + while (_segmentCompletionMgr._seconds + leaseTimeSec + <= startTime + SegmentCompletionManager.getMaxCommitTimeForAllSegmentsSeconds()) { params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr).withExtraTimeSec(leaseTimeSec); response = _segmentCompletionMgr.extendBuildTime(params); @@ -1252,7 +1155,7 @@ public class SegmentCompletionTest { params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr); response = _segmentCompletionMgr.segmentCommitStart(params); - Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)); + Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD); // So s2 goes back into HOLDING state. s1 and s3 are already holding, so now it will get COMMIT back. params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) @@ -1322,7 +1225,7 @@ public class SegmentCompletionTest { params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) .withSegmentName(_segmentNameStr); response = _segmentCompletionMgr.segmentCommitStart(params); - Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)); + Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD); // So s2 goes back into HOLDING state. s1 and s3 are already holding, so now it will get COMMIT back. params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) @@ -1404,8 +1307,8 @@ public class SegmentCompletionTest { } public static class MockSegmentCompletionManager extends SegmentCompletionManager { + private final boolean _isLeader; public long _seconds; - private boolean _isLeader; protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader, boolean isConnected) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org