This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a616d8ce5 Cleanup non split commit code (#14559)
3a616d8ce5 is described below

commit 3a616d8ce525bf9e611cde273373e57a768ce2de
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Nov 29 20:02:35 2024 -0800

    Cleanup non split commit code (#14559)
---
 .../resources/LLCSegmentCompletionHandlers.java    |  91 +--------
 .../core/realtime/SegmentCompletionManager.java    |  40 ++--
 .../helix/core/realtime/SegmentCompletionTest.java | 219 ++++++---------------
 3 files changed, 77 insertions(+), 273 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 5e5adb0d80..c459436225 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -53,7 +53,6 @@ import org.apache.pinot.core.auth.TargetType;
 import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
@@ -199,94 +198,6 @@ public class LLCSegmentCompletionHandlers {
     return response;
   }
 
-  // Remove after releasing 1.1 (server always use split commit)
-  @Deprecated
-  @POST
-  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT)
-  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.COMMIT_SEGMENT)
-  @Authenticate(AccessType.CREATE)
-  @Consumes(MediaType.MULTIPART_FORM_DATA)
-  @Produces(MediaType.APPLICATION_JSON)
-  public String 
segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
-      @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-      @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
-      @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
-      @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
-      @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
-      @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
-      @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, 
FormDataMultiPart multiPart) {
-    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params()
-        .withInstanceId(instanceId)
-        .withSegmentName(segmentName)
-        .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
-        .withSegmentSizeBytes(segmentSizeBytes)
-        .withBuildTimeMillis(buildTimeMillis)
-        .withWaitTimeMillis(waitTimeMillis)
-        .withNumRows(numRows)
-        .withMemoryUsedBytes(memoryUsedBytes);
-    LOGGER.info("Processing segmentCommit: {}", requestParams);
-
-    SegmentCompletionProtocol.Response response = 
_segmentCompletionManager.segmentCommitStart(requestParams);
-
-    CommittingSegmentDescriptor committingSegmentDescriptor =
-        
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(requestParams);
-    boolean success = false;
-
-    if (response.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) {
-      File localTempFile = null;
-      try {
-        localTempFile = extractSegmentFromFormToLocalTempFile(multiPart, 
segmentName);
-        SegmentMetadataImpl segmentMetadata = 
extractMetadataFromLocalSegmentFile(localTempFile);
-        // Store the segment file to Pinot FS.
-        String rawTableName = new LLCSegmentName(segmentName).getTableName();
-        URI segmentFileURI =
-            
URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
 rawTableName,
-                URIUtils.encode(segmentName));
-        PinotFS pinotFS = PinotFSFactory.create(segmentFileURI.getScheme());
-        // Multiple threads can reach this point at the same time, if the 
following scenario happens
-        // The server that was asked to commit did so very slowly (due to 
network speeds). Meanwhile the FSM in
-        // SegmentCompletionManager timed out, and allowed another server to 
commit, which did so very quickly (somehow
-        // the network speeds changed). The second server made it through the 
FSM and reached this point.
-        // The synchronization below takes care that exactly one file gets 
moved in place.
-        // There are still corner conditions that are not handled correctly. 
For example,
-        // 1. What if the offset of the faster server was different?
-        // 2. We know that only the faster server will get to complete the 
COMMIT call successfully. But it is possible
-        //    that the race to this statement is won by the slower server, and 
so the real segment that is in there
-        //    is that
-        //    of the slower server.
-        // In order to overcome controller restarts after the segment is moved 
to PinotFS, but before it is
-        // committed, we DO need to
-        // check for existing segment file and remove it. So, the block cannot 
be removed altogether.
-        // For now, we live with these corner cases. Once we have split-commit 
enabled and working, this code will no
-        // longer
-        // be used.
-        synchronized (SEGMENT_UPLOAD_LOCK) {
-          if (pinotFS.exists(segmentFileURI)) {
-            LOGGER.warn("Segment file: {} already exists. Replacing it with 
segment: {} from instance: {}",
-                segmentFileURI, segmentName, instanceId);
-            pinotFS.delete(segmentFileURI, true);
-          }
-          pinotFS.copyFromLocalFile(localTempFile, segmentFileURI);
-        }
-        committingSegmentDescriptor =
-            
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
 segmentMetadata);
-        
committingSegmentDescriptor.setSegmentLocation(segmentFileURI.toString());
-        success = true;
-      } catch (Exception e) {
-        LOGGER.error("Caught exception while committing segment: {} from 
instance: {}", segmentName, instanceId, e);
-      } finally {
-        FileUtils.deleteQuietly(localTempFile);
-      }
-    }
-
-    response = _segmentCompletionManager.segmentCommitEnd(requestParams, 
success, false, committingSegmentDescriptor);
-    LOGGER.info("Response to segmentCommit: instance={}, segment={}, 
status={}, streamMsgOffset={}",
-        requestParams.getInstanceId(), requestParams.getSegmentName(), 
response.getStatus(),
-        response.getStreamPartitionMsgOffset());
-
-    return response.toJsonString();
-  }
-
   // This method may be called in any controller, leader or non-leader. It is 
used only when the server decides to use
   // split commit protocol for the segment commit.
   // TODO: remove this API. Should not upload segment via controller
@@ -379,7 +290,7 @@ public class LLCSegmentCompletionHandlers {
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
 
-    String response = 
_segmentCompletionManager.segmentCommitEnd(requestParams, true, true,
+    String response = _segmentCompletionManager.segmentCommitEnd(requestParams,
             
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
 segmentMetadata))
         .toJsonString();
     LOGGER.info("Response to segmentCommitEndWithMetadata for segment: {} is: 
{}", segmentName, response);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 7672addb3b..92a9adc1c0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -276,12 +276,10 @@ public class SegmentCompletionManager {
    *
    * It returns a response code to be sent back to the client.
    *
-   * If the repsonse code is not COMMIT_SUCCESS, then the caller may remove 
the segment that has been saved.
-   *
-   * @return
+   * If the response code is not COMMIT_SUCCESS, then the caller may remove 
the segment that has been saved.
    */
   public SegmentCompletionProtocol.Response 
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
-      boolean success, boolean isSplitCommit, CommittingSegmentDescriptor 
committingSegmentDescriptor) {
+      CommittingSegmentDescriptor committingSegmentDescriptor) {
     final String segmentNameStr = reqParams.getSegmentName();
     final LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
     final String tableName = segmentName.getTableName();
@@ -292,7 +290,7 @@ public class SegmentCompletionManager {
     SegmentCompletionProtocol.Response response = 
SegmentCompletionProtocol.RESP_FAILED;
     try {
       fsm = lookupOrCreateFsm(segmentName, 
SegmentCompletionProtocol.MSG_TYPE_COMMIT);
-      response = fsm.segmentCommitEnd(reqParams, success, isSplitCommit, 
committingSegmentDescriptor);
+      response = fsm.segmentCommitEnd(reqParams, committingSegmentDescriptor);
     } catch (Exception e) {
       LOGGER.error("Caught exception in segmentCommitEnd for segment {}", 
segmentNameStr, e);
     }
@@ -602,7 +600,7 @@ public class SegmentCompletionManager {
      * the _winner.
      */
     public SegmentCompletionProtocol.Response 
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
-        boolean success, boolean isSplitCommit, CommittingSegmentDescriptor 
committingSegmentDescriptor) {
+        CommittingSegmentDescriptor committingSegmentDescriptor) {
       String instanceId = reqParams.getInstanceId();
       StreamPartitionMsgOffset offset =
           
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
@@ -619,12 +617,7 @@ public class SegmentCompletionManager {
               _segmentName.getSegmentName(), _winner, _winningOffset);
           return abortAndReturnFailed();
         }
-        if (!success) {
-          _logger.error("Segment upload failed");
-          return abortAndReturnFailed();
-        }
-        SegmentCompletionProtocol.Response response =
-            commitSegment(reqParams, isSplitCommit, 
committingSegmentDescriptor);
+        SegmentCompletionProtocol.Response response = commitSegment(reqParams, 
committingSegmentDescriptor);
         if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
           return abortAndReturnFailed();
         } else {
@@ -1028,7 +1021,7 @@ public class SegmentCompletionManager {
     }
 
     private SegmentCompletionProtocol.Response 
commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
-        boolean isSplitCommit, CommittingSegmentDescriptor 
committingSegmentDescriptor) {
+        CommittingSegmentDescriptor committingSegmentDescriptor) {
       String instanceId = reqParams.getInstanceId();
       StreamPartitionMsgOffset offset =
           
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
@@ -1040,18 +1033,15 @@ public class SegmentCompletionManager {
       }
       _logger.info("Committing segment {} at offset {} winner {}", 
_segmentName.getSegmentName(), offset, instanceId);
       _state = State.COMMITTING;
-      // In case of splitCommit, the segment is uploaded to a unique file name 
indicated by segmentLocation,
-      // so we need to move the segment file to its permanent location first 
before committing the metadata.
-      // The committingSegmentDescriptor is then updated with the permanent 
segment location to be saved in metadata
-      // store.
-      if (isSplitCommit) {
-        try {
-          _segmentManager.commitSegmentFile(_realtimeTableName, 
committingSegmentDescriptor);
-        } catch (Exception e) {
-          _logger.error("Caught exception while committing segment file for 
segment: {}", _segmentName.getSegmentName(),
-              e);
-          return SegmentCompletionProtocol.RESP_FAILED;
-        }
+      // The segment is uploaded to a unique file name indicated by 
segmentLocation, so we need to move the segment file
+      // to its permanent location first before committing the metadata. The 
committingSegmentDescriptor is then updated
+      // with the permanent segment location to be saved in metadata store.
+      try {
+        _segmentManager.commitSegmentFile(_realtimeTableName, 
committingSegmentDescriptor);
+      } catch (Exception e) {
+        _logger.error("Caught exception while committing segment file for 
segment: {}", _segmentName.getSegmentName(),
+            e);
+        return SegmentCompletionProtocol.RESP_FAILED;
       }
       try {
         // Convert to a controller uri if the segment location uses local file 
scheme.
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 19814928a4..2595d8debf 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.realtime;
 import com.google.common.base.Preconditions;
 import java.lang.reflect.Field;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -125,8 +126,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testStoppedConsumeDuringCompletion()
-      throws Exception {
+  public void testStoppedConsumeDuringCompletion() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     final String reason = "IAmLazy";
@@ -149,8 +149,8 @@ public class SegmentCompletionTest {
         .withSegmentName(_segmentNameStr).withReason(reason);
     response = _segmentCompletionMgr.segmentStoppedConsuming(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
-    Assert.assertEquals(new LLCSegmentName(_segmentNameStr), 
_segmentManager._stoppedSegmentName);
-    Assert.assertEquals(S_3, _segmentManager._stoppedInstance);
+    Assert.assertEquals(_segmentManager._stoppedSegmentName, new 
LLCSegmentName(_segmentNameStr));
+    Assert.assertEquals(_segmentManager._stoppedInstance, S_3);
     _segmentManager._stoppedSegmentName = null;
     _segmentManager._stoppedInstance = null;
 
@@ -182,8 +182,8 @@ public class SegmentCompletionTest {
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
         .withSegmentName(_segmentNameStr).withSegmentLocation("location");
-    response = _segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = _segmentCompletionMgr.segmentCommitEnd(params,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -201,8 +201,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testStoppedConsumeBeforeHold()
-      throws Exception {
+  public void testStoppedConsumeBeforeHold() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     final String reason = "IAmLazy";
@@ -212,8 +211,8 @@ public class SegmentCompletionTest {
         .withSegmentName(_segmentNameStr).withReason(reason);
     response = _segmentCompletionMgr.segmentStoppedConsuming(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
-    Assert.assertEquals(new LLCSegmentName(_segmentNameStr), 
_segmentManager._stoppedSegmentName);
-    Assert.assertEquals(S_1, _segmentManager._stoppedInstance);
+    Assert.assertEquals(_segmentManager._stoppedSegmentName, new 
LLCSegmentName(_segmentNameStr));
+    Assert.assertEquals(_segmentManager._stoppedInstance, S_1);
     _segmentManager._stoppedSegmentName = null;
     _segmentManager._stoppedInstance = null;
 
@@ -259,8 +258,8 @@ public class SegmentCompletionTest {
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
         .withSegmentName(_segmentNameStr).withSegmentLocation("location");
-    response = _segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = _segmentCompletionMgr.segmentCommitEnd(params,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -279,8 +278,7 @@ public class SegmentCompletionTest {
 
   // s2 sends stoppedConsuming message, but then may have gotten restarted, so 
eventually we complete the segment.
   @Test
-  public void testHappyPathAfterStoppedConsuming()
-      throws Exception {
+  public void testHappyPathAfterStoppedConsuming() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     _segmentCompletionMgr._seconds = 5;
@@ -290,7 +288,7 @@ public class SegmentCompletionTest {
     response = _segmentCompletionMgr.segmentStoppedConsuming(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
     Assert.assertEquals(new LLCSegmentName(_segmentNameStr), 
_segmentManager._stoppedSegmentName);
-    Assert.assertEquals(S_2, _segmentManager._stoppedInstance);
+    Assert.assertEquals(_segmentManager._stoppedInstance, S_2);
     _segmentManager._stoppedSegmentName = null;
     _segmentManager._stoppedInstance = null;
 
@@ -298,28 +296,23 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testHappyPath()
-      throws Exception {
+  public void testHappyPath() {
     testHappyPath(5L);
   }
 
-  // Tests happy path with split commit protocol
   @Test
-  public void testHappyPathSplitCommitWithLocalFS()
-      throws Exception {
-    testHappyPathSplitCommit(5L, "/local/file", "http://null:null/segments/"; + 
_tableName + "/" + _segmentNameStr);
+  public void testHappyPathWithLocalFS() {
+    testHappyPath(5L, "/local/file", "http://null:null/segments/"; + _tableName 
+ "/" + _segmentNameStr);
   }
 
   @Test
-  public void testHappyPathSplitCommitWithDeepstore()
-      throws Exception {
-    testHappyPathSplitCommit(5L, "fakefs:///segment1", "fakefs:///segment1");
+  public void testHappyPathWithDeepStore() {
+    testHappyPath(5L, "fakefs:///segment1", "fakefs:///segment1");
   }
 
   @Test
-  public void testHappyPathSplitCommitWithPeerDownloadScheme()
-      throws Exception {
-    testHappyPathSplitCommit(5L, 
CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1",
+  public void testHappyPathWithPeerDownloadScheme() {
+    testHappyPath(5L, CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + 
"/segment1",
         CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1");
   }
 
@@ -393,8 +386,8 @@ public class SegmentCompletionTest {
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
         .withSegmentName(_segmentNameStr).withSegmentLocation("doNotCommitMe");
-    response = _segmentCompletionMgr
-        .segmentCommitEnd(params, true, true, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = _segmentCompletionMgr.segmentCommitEnd(params,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
 
     // Now the FSM should have aborted
@@ -431,15 +424,14 @@ public class SegmentCompletionTest {
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s2Offset.toString())
         .withSegmentName(_segmentNameStr).withSegmentLocation("location");
-    response = _segmentCompletionMgr
-        .segmentCommitEnd(params, true, true, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = _segmentCompletionMgr.segmentCommitEnd(params,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.COMMIT_SUCCESS);
     // And the FSM should be removed.
     Assert.assertFalse(_fsmMap.containsKey(_segmentNameStr));
   }
 
-  private void testHappyPathSplitCommit(long startTime, String 
segmentLocation, String downloadURL)
-      throws Exception {
+  private void testHappyPath(long startTime, String segmentLocation, @Nullable 
String downloadURL) {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -473,7 +465,6 @@ public class SegmentCompletionTest {
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
         .withSegmentName(_segmentNameStr);
     response = _segmentCompletionMgr.segmentConsumed(params);
-    // TODO: Verify controller asked to do a split commit
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
     // s3 comes back with new caught up offset, it should get a HOLD, since 
commit is not done yet.
     _segmentCompletionMgr._seconds += 1;
@@ -491,10 +482,12 @@ public class SegmentCompletionTest {
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
         .withSegmentName(_segmentNameStr).withSegmentLocation(segmentLocation);
-    response = _segmentCompletionMgr
-        .segmentCommitEnd(params, true, true, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = _segmentCompletionMgr.segmentCommitEnd(params,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
-    Assert.assertEquals(_segmentManager.getSegmentZKMetadata(null, null, 
null).getDownloadUrl(), downloadURL);
+    if (downloadURL != null) {
+      Assert.assertEquals(_segmentManager.getSegmentZKMetadata(null, null, 
null).getDownloadUrl(), downloadURL);
+    }
 
     // Now the FSM should have disappeared from the map
     Assert.assertFalse(_fsmMap.containsKey(_segmentNameStr));
@@ -510,10 +503,13 @@ public class SegmentCompletionTest {
     Assert.assertFalse(_fsmMap.containsKey(_segmentNameStr));
   }
 
+  private void testHappyPath(long startTime) {
+    testHappyPath(startTime, "location", null);
+  }
+
   // Tests that we abort when the server instance comes back with a different 
offset than it is told to commit with
   @Test
-  public void testCommitDifferentOffsetSplitCommit()
-      throws Exception {
+  public void testCommitDifferentOffset() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -547,8 +543,8 @@ public class SegmentCompletionTest {
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s3Offset.toString())
         .withSegmentName(_segmentNameStr).withSegmentLocation("location");
-    response = _segmentCompletionMgr
-        .segmentCommitEnd(params, true, true, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = _segmentCompletionMgr.segmentCommitEnd(params,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
 
     // Now the FSM should have disappeared from the map
@@ -565,75 +561,6 @@ public class SegmentCompletionTest {
     Assert.assertTrue(_fsmMap.containsKey(_segmentNameStr));
   }
 
-  public void testHappyPath(long startTime)
-      throws Exception {
-    SegmentCompletionProtocol.Response response;
-    Request.Params params;
-    // s1 sends offset of 20, gets HOLD at t = 5s;
-    _segmentCompletionMgr._seconds = startTime;
-    params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
-        .withSegmentName(_segmentNameStr);
-    response = _segmentCompletionMgr.segmentConsumed(params);
-    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
-    // s2 sends offset of 40, gets HOLD
-    _segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
-        .withSegmentName(_segmentNameStr);
-    response = _segmentCompletionMgr.segmentConsumed(params);
-    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
-    // s3 sends offset of 30, gets catchup to 40
-    _segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s3Offset.toString())
-        .withSegmentName(_segmentNameStr);
-    response = _segmentCompletionMgr.segmentConsumed(params);
-    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    verifyOffset(response, _s2Offset);
-    // Now s1 comes back, and is asked to catchup.
-    _segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
-        .withSegmentName(_segmentNameStr);
-    response = _segmentCompletionMgr.segmentConsumed(params);
-    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
-    // s2 is asked to commit.
-    _segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
-        .withSegmentName(_segmentNameStr);
-    response = _segmentCompletionMgr.segmentConsumed(params);
-    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
-    // s3 comes back with new caught up offset, it should get a HOLD, since 
commit is not done yet.
-    _segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s2Offset.toString())
-        .withSegmentName(_segmentNameStr);
-    response = _segmentCompletionMgr.segmentConsumed(params);
-    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
-    // s2 executes a successful commit
-    _segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
-        .withSegmentName(_segmentNameStr);
-    response = _segmentCompletionMgr.segmentCommitStart(params);
-    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
-
-    _segmentCompletionMgr._seconds += 5;
-    params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
-        .withSegmentName(_segmentNameStr).withSegmentLocation("location");
-    response = _segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
-    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
-
-    // Now the FSM should have disappeared from the map
-    Assert.assertFalse(_fsmMap.containsKey(_segmentNameStr));
-
-    // Now if s3 or s1 come back, they are asked to keep the segment they have.
-    _segmentCompletionMgr._seconds += 1;
-    params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s2Offset.toString())
-        .withSegmentName(_segmentNameStr);
-    response = _segmentCompletionMgr.segmentConsumed(params);
-    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
-
-    // And the FSM should be removed.
-    Assert.assertFalse(_fsmMap.containsKey(_segmentNameStr));
-  }
-
   @Test
   public void testControllerNotConnected()
       throws Exception {
@@ -649,8 +576,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testWinnerOnTimeLimit()
-      throws Exception {
+  public void testWinnerOnTimeLimit() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     _segmentCompletionMgr._seconds = 10L;
@@ -661,8 +587,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testWinnerOnForceCommit()
-      throws Exception {
+  public void testWinnerOnForceCommit() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // S1 comes to force commit
@@ -714,8 +639,8 @@ public class SegmentCompletionTest {
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
         .withSegmentName(_segmentNameStr).withSegmentLocation("location");
-    response = _segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = _segmentCompletionMgr.segmentCommitEnd(params,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // S3 comes back at the latest offset
@@ -736,8 +661,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testWinnerOnRowLimit()
-      throws Exception {
+  public void testWinnerOnRowLimit() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     _segmentCompletionMgr._seconds = 10L;
@@ -767,8 +691,8 @@ public class SegmentCompletionTest {
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
         .withSegmentName(_segmentNameStr).withSegmentLocation("location");
-    response = _segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = _segmentCompletionMgr.segmentCommitEnd(params,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // We ask S2 to keep the segment
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s1Offset.toString())
@@ -782,26 +706,12 @@ public class SegmentCompletionTest {
     Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.DISCARD);
   }
 
-  // Tests that when server is delayed(Stalls for a hour), when server comes 
back, we commit successfully.
   @Test
-  public void testDelayedServerSplitCommit()
-      throws Exception {
-    testDelayedServer(true);
-  }
-
-  @Test
-  public void testDelayedServer()
-      throws Exception {
-    testDelayedServer(false);
-  }
-
-  public void testDelayedServer(boolean isSplitCommit)
-      throws Exception {
+  public void testDelayedServer() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
-    final int startTimeSecs = 5;
-    _segmentCompletionMgr._seconds = startTimeSecs;
+    _segmentCompletionMgr._seconds = 5;
     params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
         .withSegmentName(_segmentNameStr);
     response = _segmentCompletionMgr.segmentConsumed(params);
@@ -840,7 +750,7 @@ public class SegmentCompletionTest {
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
         .withSegmentName(_segmentNameStr).withSegmentLocation("location");
-    response = _segmentCompletionMgr.segmentCommitEnd(params, true, 
isSplitCommit,
+    response = _segmentCompletionMgr.segmentCommitEnd(params,
         CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // Now the FSM should have disappeared from the map
@@ -858,13 +768,11 @@ public class SegmentCompletionTest {
 
   // We test the case where all servers go silent after controller asks one of 
them commit
   @Test
-  public void testDeadServers()
-      throws Exception {
+  public void testDeadServers() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
-    final int startTimeSecs = 5;
-    _segmentCompletionMgr._seconds = startTimeSecs;
+    _segmentCompletionMgr._seconds = 5;
     params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
         .withSegmentName(_segmentNameStr);
     response = _segmentCompletionMgr.segmentConsumed(params);
@@ -914,13 +822,11 @@ public class SegmentCompletionTest {
 
   // We test the case when the committer is asked to commit, but they never 
come back.
   @Test
-  public void testCommitterFailure()
-      throws Exception {
+  public void testCommitterFailure() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
-    final int startTimeSecs = 5;
-    _segmentCompletionMgr._seconds = startTimeSecs;
+    _segmentCompletionMgr._seconds = 5;
     params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
         .withSegmentName(_segmentNameStr);
     response = _segmentCompletionMgr.segmentConsumed(params);
@@ -993,8 +899,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testHappyPathSlowCommit()
-      throws Exception {
+  public void testHappyPathSlowCommit() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 1509242466s;
@@ -1059,16 +964,15 @@ public class SegmentCompletionTest {
     long commitTimeMs = (_segmentCompletionMgr._seconds - startTime) * 1000;
     Assert.assertEquals(_commitTimeMap.get(tableName).longValue(), 
commitTimeMs);
     _segmentCompletionMgr._seconds += 55;
-    response = _segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = _segmentCompletionMgr.segmentCommitEnd(params,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // now FSM should be out of the map.
     Assert.assertFalse((_fsmMap.containsKey(_segmentNameStr)));
   }
 
   @Test
-  public void testFailedSlowCommit()
-      throws Exception {
+  public void testFailedSlowCommit() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     final String tableName = new 
LLCSegmentName(_segmentNameStr).getTableName();
@@ -1127,8 +1031,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testLeaseTooLong()
-      throws Exception {
+  public void testLeaseTooLong() {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -1176,8 +1079,8 @@ public class SegmentCompletionTest {
 
     final int leaseTimeSec = 20;
     // Lease will not be granted if the time taken so far plus lease time 
exceeds the max allowabale.
-    while (_segmentCompletionMgr._seconds + leaseTimeSec <= startTime + 
SegmentCompletionManager
-        .getMaxCommitTimeForAllSegmentsSeconds()) {
+    while (_segmentCompletionMgr._seconds + leaseTimeSec
+        <= startTime + 
SegmentCompletionManager.getMaxCommitTimeForAllSegmentsSeconds()) {
       params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
           .withSegmentName(_segmentNameStr).withExtraTimeSec(leaseTimeSec);
       response = _segmentCompletionMgr.extendBuildTime(params);
@@ -1252,7 +1155,7 @@ public class SegmentCompletionTest {
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
         .withSegmentName(_segmentNameStr);
     response = _segmentCompletionMgr.segmentCommitStart(params);
-    
Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
+    Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
 
     // So s2 goes back into HOLDING state. s1 and s3 are already holding, so 
now it will get COMMIT back.
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
@@ -1322,7 +1225,7 @@ public class SegmentCompletionTest {
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
         .withSegmentName(_segmentNameStr);
     response = _segmentCompletionMgr.segmentCommitStart(params);
-    
Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
+    Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
 
     // So s2 goes back into HOLDING state. s1 and s3 are already holding, so 
now it will get COMMIT back.
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
@@ -1404,8 +1307,8 @@ public class SegmentCompletionTest {
   }
 
   public static class MockSegmentCompletionManager extends 
SegmentCompletionManager {
+    private final boolean _isLeader;
     public long _seconds;
-    private boolean _isLeader;
 
     protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager 
segmentManager, boolean isLeader,
         boolean isConnected) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to