This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 159afb7820 Cleanup segment upload logic and allow validation on 
real-time table (#8695)
159afb7820 is described below

commit 159afb7820c30f5eadd75a973ccdc1d99f4e2539
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon May 16 11:14:34 2022 -0700

    Cleanup segment upload logic and allow validation on real-time table (#8695)
    
    - Cleanup the segment upload logic and add more checks on the parameters
    - Refactor the segment validator to a util class to make it work on both 
metadata push and real-time table
    - Refine the return code documentation of the segment upload API
---
 .../org/apache/pinot/common/utils/URIUtils.java    |  14 +-
 .../PinotSegmentUploadDownloadRestletResource.java | 262 +++++++++++----------
 .../api/upload/SegmentValidationUtils.java         |  94 ++++++++
 .../controller/api/upload/SegmentValidator.java    | 122 ----------
 .../pinot/controller/api/upload/ZKOperator.java    | 116 ++++-----
 .../helix/core/PinotHelixResourceManager.java      |  12 -
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  16 +-
 .../helix/core/util/ZKMetadataUtils.java           |  12 +-
 .../controller/api/upload/ZKOperatorTest.java      |  40 ++--
 .../helix/core/realtime/SegmentCompletionTest.java |  10 +-
 10 files changed, 333 insertions(+), 365 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
index b2617a3972..042427b772 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
@@ -34,11 +34,9 @@ public class URIUtils {
   }
 
   /**
-   * Returns the URI for the given base path and optional parts, appends the 
local (file) scheme to the URI if no
-   * scheme exists. All the parts will be appended to the base path with the 
file separator.
+   * Returns the URI for the given path, appends the local (file) scheme to 
the URI if no scheme exists.
    */
-  public static URI getUri(String basePath, String... parts) {
-    String path = getPath(basePath, parts);
+  public static URI getUri(String path) {
     try {
       URI uri = new URI(path);
       if (uri.getScheme() != null) {
@@ -51,6 +49,14 @@ public class URIUtils {
     }
   }
 
+  /**
+   * Returns the URI for the given base path and optional parts, appends the 
local (file) scheme to the URI if no
+   * scheme exists. All the parts will be appended to the base path with the 
file separator.
+   */
+  public static URI getUri(String basePath, String... parts) {
+    return getUri(getPath(basePath, parts));
+  }
+
   /**
    * Returns the path for the given base path and optional parts. All the 
parts will be appended to the base path with
    * the file separator.
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 16f2bf1865..6739bb9cc5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.controller.api.resources;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -59,6 +59,7 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -74,12 +75,13 @@ import 
org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
-import org.apache.pinot.controller.api.upload.SegmentValidator;
+import org.apache.pinot.controller.api.upload.SegmentValidationUtils;
 import org.apache.pinot.controller.api.upload.ZKOperator;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.metadata.DefaultMetadataExtractor;
 import org.apache.pinot.core.metadata.MetadataExtractorFactory;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.crypt.PinotCrypter;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
@@ -186,50 +188,78 @@ public class PinotSegmentUploadDownloadRestletResource {
     return builder.build();
   }
 
-  private SuccessResponse uploadSegment(@Nullable String tableName, TableType 
tableType, FormDataMultiPart multiPart,
-      boolean enableParallelPushProtection, HttpHeaders headers, Request 
request, boolean moveSegmentToFinalLocation,
-      boolean allowRefresh) {
-    String uploadTypeStr = null;
-    String crypterClassNameInHeader = null;
-    String downloadUri = null;
-    String ingestionDescriptor = null;
-    if (headers != null) {
-      extractHttpHeader(headers, 
CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER);
-      extractHttpHeader(headers, 
CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);
-      ingestionDescriptor = extractHttpHeader(headers, 
CommonConstants.Controller.INGESTION_DESCRIPTOR);
-      uploadTypeStr = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
-      crypterClassNameInHeader = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.CRYPTER);
-      downloadUri = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
+  private SuccessResponse uploadSegment(@Nullable String tableName, TableType 
tableType,
+      @Nullable FormDataMultiPart multiPart, boolean 
moveSegmentToFinalLocation, boolean enableParallelPushProtection,
+      boolean allowRefresh, HttpHeaders headers, Request request) {
+    if (StringUtils.isNotEmpty(tableName)) {
+      TableType tableTypeFromTableName = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+      if (tableTypeFromTableName != null && tableTypeFromTableName != 
tableType) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Table name: %s does not match table type: %s", 
tableName, tableType),
+            Response.Status.BAD_REQUEST);
+      }
     }
+
+    // TODO: Consider validating the segment name and table name from the 
header against the actual segment
+    extractHttpHeader(headers, 
CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER);
+    extractHttpHeader(headers, 
CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);
+
+    String uploadTypeStr = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
+    String downloadURI = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
+    String crypterClassNameInHeader = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.CRYPTER);
+    String ingestionDescriptor = extractHttpHeader(headers, 
CommonConstants.Controller.INGESTION_DESCRIPTOR);
+
     File tempEncryptedFile = null;
     File tempDecryptedFile = null;
     File tempSegmentDir = null;
     try {
       ControllerFilePathProvider provider = 
ControllerFilePathProvider.getInstance();
       String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
-      tempDecryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName);
       tempEncryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName + ENCRYPTED_SUFFIX);
+      tempDecryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName);
       tempSegmentDir = new File(provider.getUntarredFileTempDir(), 
tempFileName);
 
-      boolean uploadedSegmentIsEncrypted = 
!Strings.isNullOrEmpty(crypterClassNameInHeader);
+      boolean uploadedSegmentIsEncrypted = 
StringUtils.isNotEmpty(crypterClassNameInHeader);
       FileUploadDownloadClient.FileUploadType uploadType = 
getUploadType(uploadTypeStr);
-      File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : 
tempDecryptedFile;
+      File destFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : 
tempDecryptedFile;
       long segmentSizeInBytes;
       switch (uploadType) {
-        case URI:
-          downloadSegmentFileFromURI(downloadUri, dstFile, tableName);
-          segmentSizeInBytes = dstFile.length();
-          break;
         case SEGMENT:
-          createSegmentFileFromMultipart(multiPart, dstFile);
-          segmentSizeInBytes = dstFile.length();
+          if (multiPart == null) {
+            throw new ControllerApplicationException(LOGGER,
+                "Segment file (as multipart/form-data) is required for SEGMENT 
upload mode",
+                Response.Status.BAD_REQUEST);
+          }
+          if (!moveSegmentToFinalLocation && StringUtils.isEmpty(downloadURI)) 
{
+            throw new ControllerApplicationException(LOGGER,
+                "Download URI is required if segment should not be copied to 
the deep store",
+                Response.Status.BAD_REQUEST);
+          }
+          createSegmentFileFromMultipart(multiPart, destFile);
+          segmentSizeInBytes = destFile.length();
+          break;
+        case URI:
+          if (StringUtils.isEmpty(downloadURI)) {
+            throw new ControllerApplicationException(LOGGER, "Download URI is 
required for URI upload mode",
+                Response.Status.BAD_REQUEST);
+          }
+          downloadSegmentFileFromURI(downloadURI, destFile, tableName);
+          segmentSizeInBytes = destFile.length();
           break;
         case METADATA:
+          if (multiPart == null) {
+            throw new ControllerApplicationException(LOGGER,
+                "Segment metadata file (as multipart/form-data) is required 
for METADATA upload mode",
+                Response.Status.BAD_REQUEST);
+          }
+          if (StringUtils.isEmpty(downloadURI)) {
+            throw new ControllerApplicationException(LOGGER, "Download URI is 
required for METADATA upload mode",
+                Response.Status.BAD_REQUEST);
+          }
           moveSegmentToFinalLocation = false;
-          Preconditions.checkState(downloadUri != null, "Download URI is 
required in segment metadata upload mode");
-          createSegmentFileFromMultipart(multiPart, dstFile);
+          createSegmentFileFromMultipart(multiPart, destFile);
           try {
-            URI segmentURI = new URI(downloadUri);
+            URI segmentURI = new URI(downloadURI);
             PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
             segmentSizeInBytes = pinotFS.length(segmentURI);
           } catch (Exception e) {
@@ -238,7 +268,8 @@ public class PinotSegmentUploadDownloadRestletResource {
           }
           break;
         default:
-          throw new UnsupportedOperationException("Unsupported upload type: " 
+ uploadType);
+          throw new ControllerApplicationException(LOGGER, "Unsupported upload 
type: " + uploadType,
+              Response.Status.BAD_REQUEST);
       }
 
       if (uploadedSegmentIsEncrypted) {
@@ -253,67 +284,70 @@ public class PinotSegmentUploadDownloadRestletResource {
 
       // Fetch table name. Try to derive the table name from the parameter and 
then from segment metadata
       String rawTableName;
-      if (tableName != null && !tableName.isEmpty()) {
+      if (StringUtils.isNotEmpty(tableName)) {
         rawTableName = TableNameBuilder.extractRawTableName(tableName);
-        LOGGER.info("Uploading a segment {} to table: {}, push type {}, 
(Derived from API parameter)", segmentName,
-            tableName, uploadType);
       } else {
         // TODO: remove this when we completely deprecate the table name from 
segment metadata
         rawTableName = segmentMetadata.getTableName();
-        LOGGER.info("Uploading a segment {} to table: {}, push type {}, 
(Derived from segment metadata)", segmentName,
-            tableName, uploadType);
+        LOGGER.warn("Table name is not provided when uploading segment: {} for 
table: {}", segmentName, rawTableName);
       }
-
       String tableNameWithType;
       if (tableType == TableType.OFFLINE) {
         tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
       } else {
-        if (!_pinotHelixResourceManager.isUpsertTable(rawTableName)) {
-          throw new UnsupportedOperationException(
-              "Upload segment to non-upsert realtime table is not supported " 
+ rawTableName);
-        }
         tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+        if (!_pinotHelixResourceManager.isUpsertTable(tableNameWithType)) {
+          throw new ControllerApplicationException(LOGGER,
+              "Cannot upload segment to non-upsert real-time table: " + 
tableNameWithType, Response.Status.FORBIDDEN);
+        }
       }
 
       String clientAddress = 
InetAddress.getByName(request.getRemoteAddr()).getHostName();
-      LOGGER.info("Processing upload request for segment: {} of table: {} from 
client: {}, ingestion descriptor: {}",
-          segmentName, tableNameWithType, clientAddress, ingestionDescriptor);
-
-      // Skip segment validation if upload is to an offline table and only 
segment metadata. Skip segment validation for
-      // realtime tables because the feature is experimental and only 
applicable to upsert enabled table currently.
-      if (tableType == TableType.OFFLINE && uploadType != 
FileUploadDownloadClient.FileUploadType.METADATA) {
-        // Validate segment
-        new SegmentValidator(_pinotHelixResourceManager, _controllerConf, 
_executor, _connectionManager,
-            _controllerMetrics, 
_leadControllerManager.isLeaderForTable(tableNameWithType))
-            .validateOfflineSegment(tableNameWithType, segmentMetadata, 
tempSegmentDir);
+      LOGGER.info("Processing upload request for segment: {} of table: {} with 
upload type: {} from client: {}, "
+          + "ingestion descriptor: {}", segmentName, tableNameWithType, 
uploadType, clientAddress, ingestionDescriptor);
+
+      // Validate segment
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        throw new ControllerApplicationException(LOGGER, "Failed to find 
table: " + tableNameWithType,
+            Response.Status.BAD_REQUEST);
+      }
+      SegmentValidationUtils.validateTimeInterval(segmentMetadata, 
tableConfig);
+      if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) {
+        SegmentValidationUtils.checkStorageQuota(tempSegmentDir, 
segmentMetadata, tableConfig,
+            _pinotHelixResourceManager, _controllerConf, _controllerMetrics, 
_connectionManager, _executor,
+            _leadControllerManager.isLeaderForTable(tableNameWithType));
       }
 
       // Encrypt segment
-      String crypterClassNameInTableConfig =
-          
_pinotHelixResourceManager.getCrypterClassNameFromTableConfig(tableNameWithType);
+      String crypterNameInTableConfig = 
tableConfig.getValidationConfig().getCrypterClassName();
       Pair<String, File> encryptionInfo =
           encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, 
uploadedSegmentIsEncrypted,
-              crypterClassNameInHeader, crypterClassNameInTableConfig, 
segmentName, tableNameWithType);
-
-      String crypterClassName = encryptionInfo.getLeft();
-      File finalSegmentFile = encryptionInfo.getRight();
-
-      // ZK download URI
-      String zkDownloadUri;
-      // This boolean is here for V1 segment upload, where we keep the segment 
in the downloadURI sent in the header.
-      // We will deprecate this behavior eventually.
-      if (!moveSegmentToFinalLocation) {
-        LOGGER
-            .info("Setting zkDownloadUri: to {} for segment: {} of table: {}, 
skipping move", downloadUri, segmentName,
-                tableNameWithType);
-        zkDownloadUri = downloadUri;
-      } else {
-        zkDownloadUri = getZkDownloadURIForSegmentUpload(rawTableName, 
segmentName);
+              crypterClassNameInHeader, crypterNameInTableConfig, segmentName, 
tableNameWithType);
+
+      String crypterName = encryptionInfo.getLeft();
+      File segmentFile = encryptionInfo.getRight();
+
+      // Update download URI if controller is responsible for moving the 
segment to the deep store
+      URI finalSegmentLocationURI = null;
+      if (moveSegmentToFinalLocation) {
+        URI dataDirURI = provider.getDataDirURI();
+        String dataDirPath = dataDirURI.toString();
+        String encodedSegmentName = URIUtils.encode(segmentName);
+        String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, 
rawTableName, encodedSegmentName);
+        if 
(dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME))
 {
+          downloadURI = URIUtils.getPath(provider.getVip(), "segments", 
rawTableName, encodedSegmentName);
+        } else {
+          downloadURI = finalSegmentLocationPath;
+        }
+        finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
       }
+      LOGGER.info("Using download URI: {} for segment: {} of table: {} (move 
segment: {})", downloadURI, segmentFile,
+          tableNameWithType, moveSegmentToFinalLocation);
 
-      // Zk operations
-      completeZkOperations(enableParallelPushProtection, headers, 
finalSegmentFile, tableNameWithType, segmentMetadata,
-          segmentName, zkDownloadUri, moveSegmentToFinalLocation, 
crypterClassName, allowRefresh, segmentSizeInBytes);
+      ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, 
_controllerConf, _controllerMetrics);
+      zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, 
finalSegmentLocationURI, segmentFile,
+          downloadURI, crypterName, segmentSizeInBytes, 
enableParallelPushProtection, allowRefresh, headers);
 
       return new SuccessResponse("Successfully uploaded segment: " + 
segmentName + " of table: " + tableNameWithType);
     } catch (WebApplicationException e) {
@@ -338,16 +372,17 @@ public class PinotSegmentUploadDownloadRestletResource {
     return value;
   }
 
+  @VisibleForTesting
   Pair<String, File> encryptSegmentIfNeeded(File tempDecryptedFile, File 
tempEncryptedFile,
       boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment, 
String crypterClassNameInTableConfig,
       String segmentName, String tableNameWithType) {
 
-    boolean segmentNeedsEncryption = 
!Strings.isNullOrEmpty(crypterClassNameInTableConfig);
+    boolean segmentNeedsEncryption = 
StringUtils.isNotEmpty(crypterClassNameInTableConfig);
 
     // form the output
     File finalSegmentFile =
         (isUploadedSegmentEncrypted || segmentNeedsEncryption) ? 
tempEncryptedFile : tempDecryptedFile;
-    String crypterClassName = 
Strings.isNullOrEmpty(crypterClassNameInTableConfig) ? 
crypterUsedInUploadedSegment
+    String crypterClassName = 
StringUtils.isEmpty(crypterClassNameInTableConfig) ? 
crypterUsedInUploadedSegment
         : crypterClassNameInTableConfig;
     ImmutablePair<String, File> out = ImmutablePair.of(crypterClassName, 
finalSegmentFile);
 
@@ -371,19 +406,6 @@ public class PinotSegmentUploadDownloadRestletResource {
     return out;
   }
 
-  private String getZkDownloadURIForSegmentUpload(String rawTableName, String 
segmentName) {
-    ControllerFilePathProvider provider = 
ControllerFilePathProvider.getInstance();
-    URI dataDirURI = provider.getDataDirURI();
-    if 
(dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME))
 {
-      return URIUtils.constructDownloadUrl(provider.getVip(), rawTableName, 
segmentName);
-    } else {
-      // Receiving .tar.gz segment upload for pluggable storage. Download URI 
is the same as final segment location.
-      String downloadUri = URIUtils.getPath(dataDirURI.toString(), 
rawTableName, URIUtils.encode(segmentName));
-      LOGGER.info("Using download uri: {} for segment: {} of table {}", 
downloadUri, segmentName, rawTableName);
-      return downloadUri;
-    }
-  }
-
   private void downloadSegmentFileFromURI(String currentSegmentLocationURI, 
File destFile, String tableName)
       throws Exception {
     if (currentSegmentLocationURI == null || 
currentSegmentLocationURI.isEmpty()) {
@@ -406,19 +428,6 @@ public class PinotSegmentUploadDownloadRestletResource {
     return 
MetadataExtractorFactory.create(metadataProviderClass).extractMetadata(tempDecryptedFile,
 tempSegmentDir);
   }
 
-  private void completeZkOperations(boolean enableParallelPushProtection, 
HttpHeaders headers, File uploadedSegmentFile,
-      String tableNameWithType, SegmentMetadata segmentMetadata, String 
segmentName, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation, String crypter, boolean 
allowRefresh, long segmentSizeInBytes)
-      throws Exception {
-    String basePath = 
ControllerFilePathProvider.getInstance().getDataDirURI().toString();
-    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-    URI finalSegmentLocationURI = URIUtils.getUri(basePath, rawTableName, 
URIUtils.encode(segmentName));
-    ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, 
_controllerConf, _controllerMetrics);
-    zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, 
finalSegmentLocationURI,
-        uploadedSegmentFile, enableParallelPushProtection, headers, 
zkDownloadURI, moveSegmentToFinalLocation, crypter,
-        allowRefresh, segmentSizeInBytes);
-  }
-
   private void decryptFile(String crypterClassName, File tempEncryptedFile, 
File tempDecryptedFile) {
     PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassName);
     LOGGER.info("Using crypter class {} for decrypting {} to {}", 
pinotCrypter.getClass().getName(), tempEncryptedFile,
@@ -435,7 +444,11 @@ public class PinotSegmentUploadDownloadRestletResource {
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as json")
   @ApiResponses(value = {
       @ApiResponse(code = 200, message = "Successfully uploaded segment"),
-      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 400, message = "Bad Request"),
+      @ApiResponse(code = 403, message = "Segment validation fails"),
+      @ApiResponse(code = 409, message = "Segment already exists or another 
parallel push in progress"),
+      @ApiResponse(code = 410, message = "Segment to refresh does not exist"),
+      @ApiResponse(code = 412, message = "CRC check fails"),
       @ApiResponse(code = 500, message = "Internal error")
   })
   // We use this endpoint with URI upload because a request sent with the 
multipart content type will reject the POST
@@ -453,9 +466,8 @@ public class PinotSegmentUploadDownloadRestletResource {
       @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) 
boolean allowRefresh,
       @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
     try {
-      asyncResponse.resume(
-          uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), 
null, enableParallelPushProtection,
-              headers, request, false, allowRefresh));
+      asyncResponse.resume(uploadSegment(tableName, 
TableType.valueOf(tableType.toUpperCase()), null, false,
+          enableParallelPushProtection, allowRefresh, headers, request));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
@@ -470,7 +482,11 @@ public class PinotSegmentUploadDownloadRestletResource {
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as 
binary")
   @ApiResponses(value = {
       @ApiResponse(code = 200, message = "Successfully uploaded segment"),
-      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 400, message = "Bad Request"),
+      @ApiResponse(code = 403, message = "Segment validation fails"),
+      @ApiResponse(code = 409, message = "Segment already exists or another 
parallel push in progress"),
+      @ApiResponse(code = 410, message = "Segment to refresh does not exist"),
+      @ApiResponse(code = 412, message = "CRC check fails"),
       @ApiResponse(code = 500, message = "Internal error")
   })
   // For the multipart endpoint, we will always move segment to final location 
regardless of the segment endpoint.
@@ -486,9 +502,8 @@ public class PinotSegmentUploadDownloadRestletResource {
       @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) 
boolean allowRefresh,
       @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
     try {
-      asyncResponse.resume(
-          uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), 
multiPart, enableParallelPushProtection,
-              headers, request, true, allowRefresh));
+      asyncResponse.resume(uploadSegment(tableName, 
TableType.valueOf(tableType.toUpperCase()), multiPart, true,
+          enableParallelPushProtection, allowRefresh, headers, request));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
@@ -503,7 +518,11 @@ public class PinotSegmentUploadDownloadRestletResource {
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as json")
   @ApiResponses(value = {
       @ApiResponse(code = 200, message = "Successfully uploaded segment"),
-      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 400, message = "Bad Request"),
+      @ApiResponse(code = 403, message = "Segment validation fails"),
+      @ApiResponse(code = 409, message = "Segment already exists or another 
parallel push in progress"),
+      @ApiResponse(code = 410, message = "Segment to refresh does not exist"),
+      @ApiResponse(code = 412, message = "CRC check fails"),
       @ApiResponse(code = 500, message = "Internal error")
   })
   // We use this endpoint with URI upload because a request sent with the 
multipart content type will reject the POST
@@ -522,8 +541,8 @@ public class PinotSegmentUploadDownloadRestletResource {
       @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
     try {
       asyncResponse.resume(
-          uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), 
null, enableParallelPushProtection,
-              headers, request, true, allowRefresh));
+          uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), 
null, true, enableParallelPushProtection,
+              allowRefresh, headers, request));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
@@ -538,7 +557,11 @@ public class PinotSegmentUploadDownloadRestletResource {
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as 
binary")
   @ApiResponses(value = {
       @ApiResponse(code = 200, message = "Successfully uploaded segment"),
-      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 400, message = "Bad Request"),
+      @ApiResponse(code = 403, message = "Segment validation fails"),
+      @ApiResponse(code = 409, message = "Segment already exists or another 
parallel push in progress"),
+      @ApiResponse(code = 410, message = "Segment to refresh does not exist"),
+      @ApiResponse(code = 412, message = "CRC check fails"),
       @ApiResponse(code = 500, message = "Internal error")
   })
   // This behavior does not differ from v1 of the same endpoint.
@@ -554,9 +577,8 @@ public class PinotSegmentUploadDownloadRestletResource {
       @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) 
boolean allowRefresh,
       @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
     try {
-      asyncResponse.resume(
-          uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), 
multiPart, enableParallelPushProtection,
-              headers, request, true, allowRefresh));
+      asyncResponse.resume(uploadSegment(tableName, 
TableType.valueOf(tableType.toUpperCase()), multiPart, true,
+          enableParallelPushProtection, allowRefresh, headers, request));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
@@ -581,9 +603,8 @@ public class PinotSegmentUploadDownloadRestletResource {
       }
       String tableNameWithType =
           
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableType, LOGGER).get(0);
-      String segmentLineageEntryId = _pinotHelixResourceManager
-          .startReplaceSegments(tableNameWithType, 
startReplaceSegmentsRequest.getSegmentsFrom(),
-              startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup);
+      String segmentLineageEntryId = 
_pinotHelixResourceManager.startReplaceSegments(tableNameWithType,
+          startReplaceSegmentsRequest.getSegmentsFrom(), 
startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup);
       return 
Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", 
segmentLineageEntryId)).build();
     } catch (WebApplicationException wae) {
       throw wae;
@@ -629,8 +650,8 @@ public class PinotSegmentUploadDownloadRestletResource {
   public Response revertReplaceSegments(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME", required = true) 
@QueryParam("type") String tableTypeStr,
-      @ApiParam(value = "Segment lineage entry id to revert", required = true)
-      @QueryParam("segmentLineageEntryId") String segmentLineageEntryId,
+      @ApiParam(value = "Segment lineage entry id to revert", required = true) 
@QueryParam("segmentLineageEntryId")
+          String segmentLineageEntryId,
       @ApiParam(value = "Force revert in case the user knows that the lineage 
entry is interrupted")
       @QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert) {
     try {
@@ -652,7 +673,7 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
   }
 
-  private File createSegmentFileFromMultipart(FormDataMultiPart multiPart, 
File dstFile)
+  private static void createSegmentFileFromMultipart(FormDataMultiPart 
multiPart, File destFile)
       throws IOException {
     // Read segment file or segment metadata file and directly use that 
information to update zk
     Map<String, List<FormDataBodyPart>> segmentMetadataMap = 
multiPart.getFields();
@@ -662,12 +683,11 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
     FormDataBodyPart segmentMetadataBodyPart = 
segmentMetadataMap.values().iterator().next().get(0);
     try (InputStream inputStream = 
segmentMetadataBodyPart.getValueAs(InputStream.class);
-        OutputStream outputStream = new FileOutputStream(dstFile)) {
+        OutputStream outputStream = new FileOutputStream(destFile)) {
       IOUtils.copyLarge(inputStream, outputStream);
     } finally {
       multiPart.cleanup();
     }
-    return dstFile;
   }
 
   private FileUploadDownloadClient.FileUploadType getUploadType(String 
uploadTypeStr) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
new file mode 100644
index 0000000000..22f264d256
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.upload;
+
+import java.io.File;
+import java.util.concurrent.Executor;
+import javax.ws.rs.core.Response;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.controller.validation.StorageQuotaChecker;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SegmentValidationUtils provides utility methods to validate the segment 
during segment upload.
+ */
+public class SegmentValidationUtils {
+  private SegmentValidationUtils() {
+  }
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentValidationUtils.class);
+
+  public static void validateTimeInterval(SegmentMetadata segmentMetadata, 
TableConfig tableConfig) {
+    Interval timeInterval = segmentMetadata.getTimeInterval();
+    if (timeInterval != null) {
+      if (!TimeUtils.isValidTimeInterval(timeInterval)) {
+        throw new ControllerApplicationException(LOGGER, String.format(
+            "Invalid segment start/end time: %s (in millis: %d/%d) for 
segment: %s of table: %s, must be between: %s",
+            timeInterval, timeInterval.getStartMillis(), 
timeInterval.getEndMillis(), segmentMetadata.getName(),
+            tableConfig.getTableName(), TimeUtils.VALID_TIME_INTERVAL), 
Response.Status.FORBIDDEN);
+      }
+    } else {
+      String timeColumn = 
tableConfig.getValidationConfig().getTimeColumnName();
+      if (timeColumn != null) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Failed to find time interval in segment: %s for 
table: %s with time column: %s",
+                segmentMetadata.getName(), tableConfig.getTableName(), 
timeColumn), Response.Status.FORBIDDEN);
+      }
+    }
+  }
+
+  public static void checkStorageQuota(File segmentDir, SegmentMetadata 
segmentMetadata, TableConfig tableConfig,
+      PinotHelixResourceManager resourceManager, ControllerConf 
controllerConf, ControllerMetrics controllerMetrics,
+      HttpConnectionManager connectionManager, Executor executor, boolean 
isLeaderForTable) {
+    if (!controllerConf.getEnableStorageQuotaCheck()) {
+      return;
+    }
+    TableSizeReader tableSizeReader =
+        new TableSizeReader(executor, connectionManager, controllerMetrics, 
resourceManager);
+    StorageQuotaChecker quotaChecker =
+        new StorageQuotaChecker(tableConfig, tableSizeReader, 
controllerMetrics, isLeaderForTable, resourceManager);
+    StorageQuotaChecker.QuotaCheckerResponse response;
+    try {
+      response =
+          quotaChecker.isSegmentStorageWithinQuota(segmentMetadata.getName(), 
FileUtils.sizeOfDirectory(segmentDir),
+              controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Caught exception while checking the storage quota for 
segment: %s of table: %s",
+              segmentMetadata.getName(), tableConfig.getTableName()), 
Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    if (!response._isSegmentWithinQuota) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Storage quota check failed for segment: %s of table: 
%s, reason: %s",
+              segmentMetadata.getName(), tableConfig.getTableName(), 
response._reason), Response.Status.FORBIDDEN);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
deleted file mode 100644
index 3e34afdc67..0000000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.api.upload;
-
-import java.io.File;
-import java.util.concurrent.Executor;
-import javax.ws.rs.core.Response;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.exception.InvalidConfigException;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.ControllerConf;
-import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.util.TableSizeReader;
-import org.apache.pinot.controller.validation.StorageQuotaChecker;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.utils.TimeUtils;
-import org.joda.time.Interval;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * SegmentValidator is a util class used during segment upload. It does 
verification such as a quota check and
- * validating
- * that the segment time values stored in the segment are valid.
- */
-public class SegmentValidator {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentValidator.class);
-  private final PinotHelixResourceManager _pinotHelixResourceManager;
-  private final ControllerConf _controllerConf;
-  private final Executor _executor;
-  private final HttpConnectionManager _connectionManager;
-  private final ControllerMetrics _controllerMetrics;
-  private final boolean _isLeaderForTable;
-
-  public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager, 
ControllerConf controllerConf,
-      Executor executor, HttpConnectionManager connectionManager, 
ControllerMetrics controllerMetrics,
-      boolean isLeaderForTable) {
-    _pinotHelixResourceManager = pinotHelixResourceManager;
-    _controllerConf = controllerConf;
-    _executor = executor;
-    _connectionManager = connectionManager;
-    _controllerMetrics = controllerMetrics;
-    _isLeaderForTable = isLeaderForTable;
-  }
-
-  public void validateOfflineSegment(String offlineTableName, SegmentMetadata 
segmentMetadata, File tempSegmentDir) {
-    TableConfig offlineTableConfig =
-        
ZKMetadataProvider.getOfflineTableConfig(_pinotHelixResourceManager.getPropertyStore(),
 offlineTableName);
-    if (offlineTableConfig == null) {
-      throw new ControllerApplicationException(LOGGER, "Failed to find table 
config for table: " + offlineTableName,
-          Response.Status.NOT_FOUND);
-    }
-
-    String segmentName = segmentMetadata.getName();
-    StorageQuotaChecker.QuotaCheckerResponse quotaResponse;
-    try {
-      quotaResponse = checkStorageQuota(tempSegmentDir, segmentMetadata, 
offlineTableConfig);
-    } catch (InvalidConfigException e) {
-      // Admin port is missing, return response with 500 status code.
-      throw new ControllerApplicationException(LOGGER,
-          "Quota check failed for segment: " + segmentName + " of table: " + 
offlineTableName + ", reason: " + e
-              .getMessage(), Response.Status.INTERNAL_SERVER_ERROR);
-    }
-    if (!quotaResponse._isSegmentWithinQuota) {
-      throw new ControllerApplicationException(LOGGER,
-          "Quota check failed for segment: " + segmentName + " of table: " + 
offlineTableName + ", reason: "
-              + quotaResponse._reason, Response.Status.FORBIDDEN);
-    }
-
-    // Check time interval
-    // TODO: Pass in schema and check the existence of time interval when time 
field exists
-    Interval timeInterval = segmentMetadata.getTimeInterval();
-    if (timeInterval != null && !TimeUtils.isValidTimeInterval(timeInterval)) {
-      throw new ControllerApplicationException(LOGGER, String.format(
-          "Invalid segment start/end time: %s (in millis: %d/%d) for segment: 
%s of table: %s, must be between: %s",
-          timeInterval, timeInterval.getStartMillis(), 
timeInterval.getEndMillis(), segmentName, offlineTableName,
-          TimeUtils.VALID_TIME_INTERVAL), Response.Status.NOT_ACCEPTABLE);
-    }
-  }
-
-  /**
-   * check if the segment represented by segmentFile is within the storage 
quota
-   * @param segmentFile untarred segment. This should not be null.
-   *                    segmentFile must exist on disk and must be a directory
-   * @param metadata segment metadata. This should not be null.
-   * @param offlineTableConfig offline table configuration. This should not be 
null.
-   */
-  private StorageQuotaChecker.QuotaCheckerResponse checkStorageQuota(File 
segmentFile, SegmentMetadata metadata,
-      TableConfig offlineTableConfig)
-      throws InvalidConfigException {
-    if (!_controllerConf.getEnableStorageQuotaCheck()) {
-      return StorageQuotaChecker.success("Quota check is disabled");
-    }
-    TableSizeReader tableSizeReader =
-        new TableSizeReader(_executor, _connectionManager, _controllerMetrics, 
_pinotHelixResourceManager);
-    StorageQuotaChecker quotaChecker = new 
StorageQuotaChecker(offlineTableConfig, tableSizeReader,
-        _controllerMetrics, _isLeaderForTable, _pinotHelixResourceManager);
-    return quotaChecker.isSegmentStorageWithinQuota(metadata.getName(), 
FileUtils.sizeOfDirectory(segmentFile),
-        _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 2e7ba5c76e..7a12d9e5eb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.upload;
 
 import java.io.File;
 import java.net.URI;
+import javax.annotation.Nullable;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import org.apache.helix.ZNRecord;
@@ -33,7 +34,6 @@ import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,26 +60,24 @@ public class ZKOperator {
   }
 
   public void completeSegmentOperations(String tableNameWithType, 
SegmentMetadata segmentMetadata,
-      URI finalSegmentLocationURI, File currentSegmentLocation, boolean 
enableParallelPushProtection,
-      HttpHeaders headers, String zkDownloadURI, boolean 
moveSegmentToFinalLocation, String crypter,
-      boolean allowRefresh, long segmentSizeInBytes)
+      @Nullable URI finalSegmentLocationURI, File segmentFile, String 
downloadUrl, @Nullable String crypterName,
+      long segmentSizeInBytes, boolean enableParallelPushProtection, boolean 
allowRefresh, HttpHeaders headers)
       throws Exception {
     String segmentName = segmentMetadata.getName();
-    ZNRecord segmentMetadataZNRecord =
+    ZNRecord existingSegmentMetadataZNRecord =
         
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
     boolean refreshOnly =
         
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
-    if (segmentMetadataZNRecord == null) {
+    if (existingSegmentMetadataZNRecord == null) {
       // Add a new segment
       if (refreshOnly) {
         throw new ControllerApplicationException(LOGGER,
-            "Cannot refresh non-existing segment, aborted uploading segment: " 
+ segmentName + " of table: "
-                + tableNameWithType, Response.Status.GONE);
+            String.format("Cannot refresh non-existing segment: %s for table: 
%s", segmentName, tableNameWithType),
+            Response.Status.GONE);
       }
-      LOGGER.info("Adding new segment {} from table {}", segmentName, 
tableNameWithType);
-      processNewSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation, zkDownloadURI, headers,
-          crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, 
enableParallelPushProtection,
-          segmentSizeInBytes);
+      LOGGER.info("Adding new segment: {} to table: {}", segmentName, 
tableNameWithType);
+      processNewSegment(tableNameWithType, segmentMetadata, 
finalSegmentLocationURI, segmentFile, downloadUrl,
+          crypterName, segmentSizeInBytes, enableParallelPushProtection, 
headers);
     } else {
       // Refresh an existing segment
       if (!allowRefresh) {
@@ -87,26 +85,27 @@ public class ZKOperator {
         // done up-front but ends up getting created before the check here, we 
could incorrectly refresh an existing
         // segment.
         throw new ControllerApplicationException(LOGGER,
-            "Segment: " + segmentName + " already exists in table: " + 
tableNameWithType + ". Refresh not permitted.",
-            Response.Status.CONFLICT);
+            String.format("Segment: %s already exists in table: %s. Refresh 
not permitted.", segmentName,
+                tableNameWithType), Response.Status.CONFLICT);
       }
-      LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, tableNameWithType);
-      processExistingSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation,
-          enableParallelPushProtection, headers, zkDownloadURI, crypter, 
tableNameWithType, segmentName,
-          segmentMetadataZNRecord, moveSegmentToFinalLocation, 
segmentSizeInBytes);
+      LOGGER.info("Segment: {} already exists in table: {}, refreshing it", 
segmentName, tableNameWithType);
+      processExistingSegment(tableNameWithType, segmentMetadata, 
existingSegmentMetadataZNRecord,
+          finalSegmentLocationURI, segmentFile, downloadUrl, crypterName, 
segmentSizeInBytes,
+          enableParallelPushProtection, headers);
     }
   }
 
-  private void processExistingSegment(SegmentMetadata segmentMetadata, URI 
finalSegmentLocationURI,
-      File currentSegmentLocation, boolean enableParallelPushProtection, 
HttpHeaders headers, String downloadUrl,
-      String crypter, String tableNameWithType, String segmentName, ZNRecord 
znRecord,
-      boolean moveSegmentToFinalLocation, long segmentSizeInBytes)
+  private void processExistingSegment(String tableNameWithType, 
SegmentMetadata segmentMetadata,
+      ZNRecord existingSegmentMetadataZNRecord, @Nullable URI 
finalSegmentLocationURI, File segmentFile,
+      String downloadUrl, @Nullable String crypterName, long 
segmentSizeInBytes, boolean enableParallelPushProtection,
+      HttpHeaders headers)
       throws Exception {
-    SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(znRecord);
-    long existingCrc = segmentZKMetadata.getCrc();
-    int expectedVersion = znRecord.getVersion();
+    String segmentName = segmentMetadata.getName();
+    int expectedVersion = existingSegmentMetadataZNRecord.getVersion();
 
     // Check if CRC match when IF-MATCH header is set
+    SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(existingSegmentMetadataZNRecord);
+    long existingCrc = segmentZKMetadata.getCrc();
     checkCRC(headers, tableNameWithType, segmentName, existingCrc);
 
     // Check segment upload start time when parallel push protection enabled
@@ -122,8 +121,8 @@ public class ZKOperator {
         } else {
           // Another segment upload is in progress
           throw new ControllerApplicationException(LOGGER,
-              "Another segment upload is in progress for segment: " + 
segmentName + " of table: " + tableNameWithType
-                  + ", retry later", Response.Status.CONFLICT);
+              String.format("Another segment upload is in progress for 
segment: %s of table: %s, retry later",
+                  segmentName, tableNameWithType), Response.Status.CONFLICT);
         }
       }
 
@@ -131,7 +130,7 @@ public class ZKOperator {
       segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
       if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
         throw new ControllerApplicationException(LOGGER,
-            "Failed to lock the segment: " + segmentName + " of table: " + 
tableNameWithType + ", retry later",
+            String.format("Failed to lock the segment: %s of table: %s, retry 
later", segmentName, tableNameWithType),
             Response.Status.CONFLICT);
       } else {
         // The version will increment if the zk metadata update is successful
@@ -179,13 +178,10 @@ public class ZKOperator {
         LOGGER.info(
             "New segment crc {} is different than the existing segment crc {}. 
Updating ZK metadata and refreshing "
                 + "segment {}", newCrc, existingCrc, segmentName);
-        if (moveSegmentToFinalLocation) {
-          moveSegmentToPermanentDirectory(currentSegmentLocation, 
finalSegmentLocationURI);
-          LOGGER.info("Moved segment {} from temp location {} to {}", 
segmentName,
-              currentSegmentLocation.getAbsolutePath(), 
finalSegmentLocationURI.getPath());
-        } else {
-          LOGGER.info("Skipping segment move, keeping segment {} from table {} 
at {}", segmentName, tableNameWithType,
-              downloadUrl);
+        if (finalSegmentLocationURI != null) {
+          moveSegmentToPermanentDirectory(segmentFile, 
finalSegmentLocationURI);
+          LOGGER.info("Moved segment: {} of table: {} to final location: {}", 
segmentName, tableNameWithType,
+              finalSegmentLocationURI);
         }
 
         // NOTE: Must first set the segment ZK metadata before trying to 
refresh because servers and brokers rely on
@@ -196,11 +192,11 @@ public class ZKOperator {
           // If no modifier is provided, use the custom map from the segment 
metadata
           segmentZKMetadata.setCustomMap(null);
           ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, 
segmentZKMetadata, segmentMetadata, downloadUrl,
-              crypter, segmentSizeInBytes);
+              crypterName, segmentSizeInBytes);
         } else {
           // If modifier is provided, first set the custom map from the 
segment metadata, then apply the modifier
           ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, 
segmentZKMetadata, segmentMetadata, downloadUrl,
-              crypter, segmentSizeInBytes);
+              crypterName, segmentSizeInBytes);
           
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
         }
         if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
@@ -222,7 +218,7 @@ public class ZKOperator {
     }
   }
 
-  private void checkCRC(HttpHeaders headers, String offlineTableName, String 
segmentName, long existingCrc) {
+  private void checkCRC(HttpHeaders headers, String tableNameWithType, String 
segmentName, long existingCrc) {
     String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH);
     if (expectedCrcStr != null) {
       long expectedCrc;
@@ -230,24 +226,24 @@ public class ZKOperator {
         expectedCrc = Long.parseLong(expectedCrcStr);
       } catch (NumberFormatException e) {
         throw new ControllerApplicationException(LOGGER,
-            "Caught exception for segment: " + segmentName + " of table: " + 
offlineTableName
-                + " while parsing IF-MATCH CRC: \"" + expectedCrcStr + "\"", 
Response.Status.PRECONDITION_FAILED);
+            String.format("Caught exception for segment: %s of table: %s while 
parsing IF-MATCH CRC: \"%s\"",
+                segmentName, tableNameWithType, expectedCrcStr), 
Response.Status.PRECONDITION_FAILED);
       }
       if (expectedCrc != existingCrc) {
         throw new ControllerApplicationException(LOGGER,
-            "For segment: " + segmentName + " of table: " + offlineTableName + 
", expected CRC: " + expectedCrc
-                + " does not match existing CRC: " + existingCrc, 
Response.Status.PRECONDITION_FAILED);
+            String.format("For segment: %s of table: %s, expected CRC: %d does 
not match existing CRC: %d", segmentName,
+                tableNameWithType, expectedCrc, existingCrc), 
Response.Status.PRECONDITION_FAILED);
       }
     }
   }
 
-  private void processNewSegment(SegmentMetadata segmentMetadata, URI 
finalSegmentLocationURI,
-      File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, 
String crypter, String tableNameWithType,
-      String segmentName, boolean moveSegmentToFinalLocation, boolean 
enableParallelPushProtection,
-      long segmentSizeInBytes)
+  private void processNewSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata,
+      @Nullable URI finalSegmentLocationURI, File segmentFile, String 
downloadUrl, @Nullable String crypterName,
+      long segmentSizeInBytes, boolean enableParallelPushProtection, 
HttpHeaders headers)
       throws Exception {
+    String segmentName = segmentMetadata.getName();
     SegmentZKMetadata newSegmentZKMetadata =
-        ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, 
segmentMetadata, zkDownloadURI, crypter,
+        ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, 
segmentMetadata, downloadUrl, crypterName,
             segmentSizeInBytes);
 
     // Lock if enableParallelPushProtection is true.
@@ -266,15 +262,14 @@ public class ZKOperator {
     }
     if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, 
newSegmentZKMetadata)) {
       throw new RuntimeException(
-          "Failed to create ZK metadata for segment: " + segmentName + " of 
table: " + tableNameWithType);
+          String.format("Failed to create ZK metadata for segment: %s of 
table: %s", segmentName, tableNameWithType));
     }
 
-    // For v1 segment uploads, we will not move the segment
-    if (moveSegmentToFinalLocation) {
+    if (finalSegmentLocationURI != null) {
       try {
-        moveSegmentToPermanentDirectory(currentSegmentLocation, 
finalSegmentLocationURI);
-        LOGGER.info("Moved segment {} from temp location {} to {}", 
segmentName,
-            currentSegmentLocation.getAbsolutePath(), 
finalSegmentLocationURI.getPath());
+        moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI);
+        LOGGER.info("Moved segment: {} of table: {} to final location: {}", 
segmentName, tableNameWithType,
+            finalSegmentLocationURI);
       } catch (Exception e) {
         // Cleanup the Zk entry and the segment from the permanent directory 
if it exists.
         LOGGER.error("Could not move segment {} from table {} to permanent 
directory", segmentName, tableNameWithType,
@@ -283,9 +278,6 @@ public class ZKOperator {
         LOGGER.info("Deleted zk entry and segment {} for table {}.", 
segmentName, tableNameWithType);
         throw e;
       }
-    } else {
-      LOGGER.info("Skipping segment move, keeping segment {} from table {} at 
{}", segmentName, tableNameWithType,
-          zkDownloadURI);
     }
 
     try {
@@ -306,18 +298,14 @@ public class ZKOperator {
         _pinotHelixResourceManager.deleteSegment(tableNameWithType, 
segmentName);
         LOGGER.info("Deleted zk entry and segment {} for table {}.", 
segmentName, tableNameWithType);
         throw new RuntimeException(
-            "Failed to update ZK metadata for segment: " + segmentName + " of 
table: " + tableNameWithType);
+            String.format("Failed to update ZK metadata for segment: %s of 
table: %s", segmentFile, tableNameWithType));
       }
     }
   }
 
-  private void moveSegmentToPermanentDirectory(File currentSegmentLocation, 
URI finalSegmentLocationURI)
+  private void moveSegmentToPermanentDirectory(File segmentFile, URI 
finalSegmentLocationURI)
       throws Exception {
-    PinotFS pinotFS = 
PinotFSFactory.create(finalSegmentLocationURI.getScheme());
-
-    // Overwrites current segment file
-    LOGGER.info("Copying segment from {} to {}", 
currentSegmentLocation.getAbsolutePath(),
-        finalSegmentLocationURI.toString());
-    pinotFS.copyFromLocalFile(currentSegmentLocation, finalSegmentLocationURI);
+    LOGGER.info("Copying segment from: {} to: {}", 
segmentFile.getAbsolutePath(), finalSegmentLocationURI);
+    
PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile,
 finalSegmentLocationURI);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 4c9f61fc4a..f9d0e86447 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -689,18 +689,6 @@ public class PinotHelixResourceManager {
     }
   }
 
-  /**
-   * Returns the crypter class name defined in the table config for the given 
table.
-   *
-   * @param tableNameWithType Table name with type suffix
-   * @return crypter class name
-   */
-  public String getCrypterClassNameFromTableConfig(String tableNameWithType) {
-    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
-    Preconditions.checkNotNull(tableConfig, "Table config is not available for 
table '%s'", tableNameWithType);
-    return tableConfig.getValidationConfig().getCrypterClassName();
-  }
-
   /**
    * Table related APIs
    */
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index cae0b48949..2d07e20be0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -459,10 +459,6 @@ public class PinotLLCRealtimeSegmentManager {
   public void commitSegmentFile(String realtimeTableName, 
CommittingSegmentDescriptor committingSegmentDescriptor)
       throws Exception {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
-    if (isPeerSegmentDownloadScheme(committingSegmentDescriptor)) {
-      LOGGER.info("No moving needed for segment on peer servers: {}", 
committingSegmentDescriptor.getSegmentLocation());
-      return;
-    }
 
     String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
     String segmentName = committingSegmentDescriptor.getSegmentName();
@@ -470,6 +466,12 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Copy the segment file to the controller
     String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
+    Preconditions.checkArgument(segmentLocation != null, "Segment location 
must be provided");
+    if (segmentLocation.regionMatches(true, 0, 
CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME, 0,
+        CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME.length())) {
+      LOGGER.info("No moving needed for segment on peer servers: {}", 
segmentLocation);
+      return;
+    }
     URI segmentFileURI = URIUtils.getUri(segmentLocation);
     URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), 
rawTableName);
     URI uriToMoveTo = URIUtils.getUri(_controllerConf.getDataDir(), 
rawTableName, URIUtils.encode(segmentName));
@@ -494,12 +496,6 @@ public class PinotLLCRealtimeSegmentManager {
     committingSegmentDescriptor.setSegmentLocation(uriToMoveTo.toString());
   }
 
-  private boolean isPeerSegmentDownloadScheme(CommittingSegmentDescriptor 
committingSegmentDescriptor) {
-    return !(committingSegmentDescriptor == null) && 
!(committingSegmentDescriptor.getSegmentLocation() == null)
-        && committingSegmentDescriptor.getSegmentLocation().toLowerCase()
-        .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME);
-  }
-
   /**
    * This method is invoked after the realtime segment is uploaded but before 
a response is sent to the server.
    * It updates the propertystore segment metadata from IN_PROGRESS to DONE, 
and also creates new propertystore
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 2628d9f6bd..11e67b1663 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -38,9 +38,9 @@ public class ZKMetadataUtils {
    * Creates the segment ZK metadata for a new segment.
    */
   public static SegmentZKMetadata createSegmentZKMetadata(String 
tableNameWithType, SegmentMetadata segmentMetadata,
-      String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
+      String downloadUrl, @Nullable String crypterName, long 
segmentSizeInBytes) {
     SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(segmentMetadata.getName());
-    updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypter,
+    updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypterName,
         segmentSizeInBytes);
     segmentZKMetadata.setPushTime(System.currentTimeMillis());
     return segmentZKMetadata;
@@ -50,14 +50,14 @@ public class ZKMetadataUtils {
    * Refreshes the segment ZK metadata for a segment being replaced.
    */
   public static void refreshSegmentZKMetadata(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
-      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypter, long segmentSizeInBytes) {
-    updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypter,
+      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypterName, long segmentSizeInBytes) {
+    updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypterName,
         segmentSizeInBytes);
     segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
   }
 
   private static void updateSegmentZKMetadata(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
-      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypter, long segmentSizeInBytes) {
+      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypterName, long segmentSizeInBytes) {
     if (segmentMetadata.getTimeInterval() != null) {
       segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
       segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
@@ -77,7 +77,7 @@ public class ZKMetadataUtils {
     segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
     segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
     segmentZKMetadata.setDownloadUrl(downloadUrl);
-    segmentZKMetadata.setCrypterName(crypter);
+    segmentZKMetadata.setCrypterName(crypterName);
 
     // Set partition metadata
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index c9a119b6a5..d4951587ce 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -43,6 +43,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+
 public class ZKOperatorTest {
   private static final String TABLE_NAME = "operatorTestTable";
   private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
@@ -70,15 +71,13 @@ public class ZKOperatorTest {
 
     // Test if Zk segment metadata is removed if exception is thrown when 
moving segment to final location.
     try {
-      // Create mock finalSegmentLocationURI and currentSegmentLocation.
+      // Create mock finalSegmentLocationURI and segmentFile.
       URI finalSegmentLocationURI =
           URIUtils.getUri("mockPath", OFFLINE_TABLE_NAME, 
URIUtils.encode(segmentMetadata.getName()));
-      File currentSegmentLocation = new File(new File("foo/bar"), "mockChild");
+      File segmentFile = new File(new File("foo/bar"), "mockChild");
 
-      zkOperator
-          .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
finalSegmentLocationURI,
-              currentSegmentLocation, true, httpHeaders, "downloadUrl",
-              true, "crypter", true, 10);
+      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, 
segmentMetadata, finalSegmentLocationURI, segmentFile,
+          "downloadUrl", "crypter", 10, true, true, httpHeaders);
       fail();
     } catch (Exception e) {
       // Expected
@@ -91,9 +90,8 @@ public class ZKOperatorTest {
       return segmentZKMetadata == null;
     }, 30_000L, "Failed to delete segmentZkMetadata.");
 
-    zkOperator
-        .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, 
null, true, httpHeaders, "downloadUrl",
-            false, "crypter", true, 10);
+    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
null, null, "downloadUrl", "crypter", 10,
+        true, true, httpHeaders);
 
     SegmentZKMetadata segmentZKMetadata =
         
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
 SEGMENT_NAME);
@@ -110,8 +108,8 @@ public class ZKOperatorTest {
 
     // Upload the same segment with allowRefresh = false. Validate that an 
exception is thrown.
     try {
-      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, 
segmentMetadata, null, null, false, httpHeaders,
-          "otherDownloadUrl", false, "otherCrypter", false, 10);
+      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, 
segmentMetadata, null, null, "otherDownloadUrl",
+          "otherCrypter", 10, true, false, httpHeaders);
       fail();
     } catch (Exception e) {
       // Expected
@@ -120,8 +118,8 @@ public class ZKOperatorTest {
     // Refresh the segment with unmatched IF_MATCH field
     when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
     try {
-      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, 
segmentMetadata, null, null, true, httpHeaders,
-          "otherDownloadUrl", false, null, true, 10);
+      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, 
segmentMetadata, null, null, "otherDownloadUrl",
+          "otherCrypter", 10, true, true, httpHeaders);
       fail();
     } catch (Exception e) {
       // Expected
@@ -131,10 +129,11 @@ public class ZKOperatorTest {
     // downloadURL and crypter
     
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
-    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
null, null, true, httpHeaders,
-        "otherDownloadUrl", false, "otherCrypter", true, 10);
+    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
null, null, "otherDownloadUrl",
+        "otherCrypter", 10, true, true, httpHeaders);
     segmentZKMetadata =
         
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
 SEGMENT_NAME);
+    assertNotNull(segmentZKMetadata);
     assertEquals(segmentZKMetadata.getCrc(), 12345L);
     // Push time should not change
     assertEquals(segmentZKMetadata.getPushTime(), pushTime);
@@ -152,13 +151,12 @@ public class ZKOperatorTest {
     when(segmentMetadata.getCrc()).thenReturn("23456");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(789L);
     // Add a tiny sleep to guarantee that refresh time is different from the 
previous round
-    // 1 second delay to avoid "org.apache.helix.HelixException: Specified 
EXTERNALVIEW operatorTestTable_OFFLINE is
-    // not found!" exception from being thrown sporadically.
-    Thread.sleep(1000L);
-    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
null, null, true, httpHeaders,
-        "otherDownloadUrl", false, "otherCrypter", true, 10);
+    Thread.sleep(10);
+    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
null, null, "otherDownloadUrl",
+        "otherCrypter", 100, true, true, httpHeaders);
     segmentZKMetadata =
         
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
 SEGMENT_NAME);
+    assertNotNull(segmentZKMetadata);
     assertEquals(segmentZKMetadata.getCrc(), 23456L);
     // Push time should not change
     assertEquals(segmentZKMetadata.getPushTime(), pushTime);
@@ -167,7 +165,7 @@ public class ZKOperatorTest {
     assertTrue(segmentZKMetadata.getRefreshTime() > refreshTime);
     assertEquals(segmentZKMetadata.getDownloadUrl(), "otherDownloadUrl");
     assertEquals(segmentZKMetadata.getCrypterName(), "otherCrypter");
-    assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
+    assertEquals(segmentZKMetadata.getSizeInBytes(), 100);
   }
 
   @AfterClass
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index f52cc2d924..2be71ffd25 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -180,7 +180,7 @@ public class SegmentCompletionTest {
 
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
-        .withSegmentName(_segmentNameStr);
+        .withSegmentName(_segmentNameStr).withSegmentLocation("location");
     response = _segmentCompletionMgr
         .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -257,7 +257,7 @@ public class SegmentCompletionTest {
 
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
-        .withSegmentName(_segmentNameStr);
+        .withSegmentName(_segmentNameStr).withSegmentLocation("location");
     response = _segmentCompletionMgr
         .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -614,7 +614,7 @@ public class SegmentCompletionTest {
 
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
-        .withSegmentName(_segmentNameStr);
+        .withSegmentName(_segmentNameStr).withSegmentLocation("location");
     response = _segmentCompletionMgr
         .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -690,7 +690,7 @@ public class SegmentCompletionTest {
 
     _segmentCompletionMgr._seconds += 5;
     params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
-        .withSegmentName(_segmentNameStr);
+        .withSegmentName(_segmentNameStr).withSegmentLocation("location");
     response = _segmentCompletionMgr
         .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -977,7 +977,7 @@ public class SegmentCompletionTest {
     // Commit in 15s
     _segmentCompletionMgr._seconds += 15;
     params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
-        .withSegmentName(_segmentNameStr);
+        .withSegmentName(_segmentNameStr).withSegmentLocation("location");
     response = _segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
     long commitTimeMs = (_segmentCompletionMgr._seconds - startTime) * 1000;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to