This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch SegmentProcessorFrameworkImprovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 002219055986a8737601688ca2f379d889b0ec65
Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com>
AuthorDate: Thu Jul 25 11:16:08 2024 -0700

    Added support to upload segments in batch mode with METADATA upload type 
(#13690)
---
 .../common/utils/FileUploadDownloadClient.java     |   7 +
 .../PinotSegmentUploadDownloadRestletResource.java | 355 ++++++++++++++++++++-
 .../api/upload/SegmentUploadMetadata.java          | 117 +++++++
 .../pinot/controller/api/upload/ZKOperator.java    | 321 +++++++++++++++++++
 .../helix/core/PinotHelixResourceManager.java      |  78 +++++
 .../SegmentUploadIntegrationTest.java              | 104 +++++-
 .../BaseMultipleSegmentsConversionExecutor.java    | 212 +++++++-----
 .../segment/local/utils/SegmentPushUtils.java      | 130 ++++++++
 .../spi/ingestion/batch/spec/PushJobSpec.java      |  15 +
 9 files changed, 1255 insertions(+), 84 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 4123e3157e..4a2cb33be2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -113,6 +113,7 @@ public class FileUploadDownloadClient implements 
AutoCloseable {
   private static final String SCHEMA_PATH = "/schemas";
   private static final String OLD_SEGMENT_PATH = "/segments";
   private static final String SEGMENT_PATH = "/v2/segments";
+  private static final String SEGMENT_UPLOAD_BATCH_PATH = "/v3/segments";
   private static final String TABLES_PATH = "/tables";
   private static final String TYPE_DELIMITER = "type=";
   private static final String START_REPLACE_SEGMENTS_PATH = 
"/startReplaceSegments";
@@ -365,6 +366,12 @@ public class FileUploadDownloadClient implements 
AutoCloseable {
     return getURI(controllerURI.getScheme(), controllerURI.getHost(), 
controllerURI.getPort(), SEGMENT_PATH);
   }
 
+  public static URI getUploadSegmentBatchURI(URI controllerURI)
+      throws URISyntaxException {
+    return getURI(controllerURI.getScheme(), controllerURI.getHost(), 
controllerURI.getPort(),
+        SEGMENT_UPLOAD_BATCH_PATH);
+  }
+
   public static URI getStartReplaceSegmentsURI(URI controllerURI, String 
rawTableName, String tableType,
       boolean forceCleanup)
       throws URISyntaxException {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 156a3e9095..2f5082e5b2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -29,14 +29,20 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -59,6 +65,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
+import org.apache.commons.io.Charsets;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -73,6 +80,7 @@ import 
org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.controller.ControllerConf;
@@ -81,6 +89,7 @@ import 
org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.api.upload.SegmentUploadMetadata;
 import org.apache.pinot.controller.api.upload.SegmentValidationUtils;
 import org.apache.pinot.controller.api.upload.ZKOperator;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -101,6 +110,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.glassfish.grizzly.http.server.Request;
+import org.glassfish.jersey.media.multipart.BodyPart;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
 import org.glassfish.jersey.server.ManagedAsync;
@@ -295,13 +305,18 @@ public class PinotSegmentUploadDownloadRestletResource {
               extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
           copySegmentToFinalLocation = 
Boolean.parseBoolean(copySegmentToDeepStore);
           createSegmentFileFromMultipart(multiPart, destFile);
+          PinotFS pinotFS = null;
           try {
             URI segmentURI = new URI(sourceDownloadURIStr);
-            PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
+            pinotFS = PinotFSFactory.create(segmentURI.getScheme());
             segmentSizeInBytes = pinotFS.length(segmentURI);
           } catch (Exception e) {
             segmentSizeInBytes = -1;
             LOGGER.warn("Could not fetch segment size for metadata push", e);
+          } finally {
+            if (pinotFS != null) {
+              pinotFS.close();
+            }
           }
           break;
         default:
@@ -403,6 +418,234 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
   }
 
+  // Method used to update a list of segments in batch mode with the METADATA 
upload type.
+  private SuccessResponse uploadSegments(String tableName, TableType 
tableType, FormDataMultiPart multiParts,
+      boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders 
headers, Request request) {
+    long segmentsUploadStartTimeMs = System.currentTimeMillis();
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    String tableNameWithType = tableType == TableType.OFFLINE ? 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
+        : TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to fetch table 
config for table: " + tableNameWithType,
+          Response.Status.BAD_REQUEST);
+    }
+
+    String clientAddress;
+    try {
+      clientAddress = 
InetAddress.getByName(request.getRemoteAddr()).getHostName();
+    } catch (UnknownHostException e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to resolve 
hostname from input request",
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    String uploadTypeStr = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
+    FileUploadType uploadType = getUploadType(uploadTypeStr);
+    if (!FileUploadType.METADATA.equals(uploadType)) {
+      throw new ControllerApplicationException(LOGGER, "Unsupported upload 
type: " + uploadTypeStr,
+          Response.Status.BAD_REQUEST);
+    }
+
+    String crypterClassNameInHeader = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.CRYPTER);
+    String ingestionDescriptor = extractHttpHeader(headers, 
CommonConstants.Controller.INGESTION_DESCRIPTOR);
+    ControllerFilePathProvider provider = 
ControllerFilePathProvider.getInstance();
+    List<SegmentUploadMetadata> segmentUploadMetadataList = new ArrayList<>();
+    List<File> tempFiles = new ArrayList<>();
+    List<String> segmentNames = new ArrayList<>();
+    List<BodyPart> bodyParts = multiParts.getBodyParts();
+    LOGGER.info("Uploading segments in batch mode of size: {}", 
bodyParts.size());
+
+    // there would be just one body part
+    FormDataBodyPart bodyPartFromReq = (FormDataBodyPart) bodyParts.get(0);
+    String uuid = UUID.randomUUID().toString();
+    File allSegmentsMetadataTarFile = new File(FileUtils.getTempDirectory(), 
"allSegmentsMetadataTar-" + uuid
+        + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    try {
+      createSegmentFileFromBodyPart(bodyPartFromReq, 
allSegmentsMetadataTarFile);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to extract 
segment metadata files from the input "
+          + "request. ", Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+
+    List<File> segmentMetadataFiles = new ArrayList<>();
+    File allSegmentsMetadataDir = new File(FileUtils.getTempDirectory(), 
"allSegmentsMetadataDir-" + uuid);
+    try {
+      FileUtils.forceMkdir(allSegmentsMetadataDir);
+      List<File> metadataFiles = 
TarGzCompressionUtils.untar(allSegmentsMetadataTarFile, allSegmentsMetadataDir);
+      if (!metadataFiles.isEmpty()) {
+        segmentMetadataFiles.addAll(metadataFiles);
+      }
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to unzip the 
segment metadata files. ",
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+
+    // segmentName, (creation.meta, metadata.properties)
+    Map<String, SegmentMetadataInfo> segmentMetadataFileMap = new HashMap<>();
+    // segmentName, segmentDownloadURI
+    Map<String, String> segmentURIMap = new HashMap<>();
+    for (File file: segmentMetadataFiles) {
+      String fileName = file.getName();
+      if (fileName.equalsIgnoreCase("all_segments_metadata")) {
+        try (InputStream inputStream = FileUtils.openInputStream(file)) {
+          final InputStreamReader reader = new InputStreamReader(inputStream, 
Charsets.toCharset(
+              StandardCharsets.UTF_8));
+          try (BufferedReader bufReader = IOUtils.toBufferedReader(reader)) {
+            String segmentNameLine;
+            String segmentURILine;
+            while ((segmentNameLine = bufReader.readLine()) != null) {
+              segmentURILine = bufReader.readLine();
+              segmentURIMap.put(segmentNameLine, segmentURILine);
+            }
+          }
+        } catch (IOException e) {
+          throw new ControllerApplicationException(LOGGER, "Failed to read the 
all_segment_metadata file. ",
+              Response.Status.INTERNAL_SERVER_ERROR, e);
+        }
+      } else if (fileName.endsWith(".creation.meta")) {
+        int suffixLength = ".creation.meta".length();
+        String segmentName = fileName.substring(0, fileName.length() - 
suffixLength);
+        SegmentMetadataInfo segmentMetadataInfo = 
segmentMetadataFileMap.getOrDefault(segmentName,
+            new SegmentMetadataInfo());
+        segmentMetadataInfo.setSegmentCreationMetaFile(file);
+        segmentMetadataFileMap.put(segmentName, segmentMetadataInfo);
+      } else if (fileName.endsWith(".metadata.properties")) {
+        int suffixLength = ".metadata.properties".length();
+        String segmentName = fileName.substring(0, fileName.length() - 
suffixLength);
+        SegmentMetadataInfo segmentMetadataInfo = 
segmentMetadataFileMap.getOrDefault(segmentName,
+            new SegmentMetadataInfo());
+        segmentMetadataInfo.setSegmentPropertiesFile(file);
+        segmentMetadataFileMap.put(segmentName, segmentMetadataInfo);
+      }
+    }
+
+    try {
+      int entryCount = 0;
+      for (Map.Entry<String, SegmentMetadataInfo> entry: 
segmentMetadataFileMap.entrySet()) {
+        String segmentName = entry.getKey();
+        SegmentMetadataInfo segmentMetadataInfo = entry.getValue();
+        segmentNames.add(segmentName);
+        File tempEncryptedFile;
+        File tempDecryptedFile;
+        File tempSegmentDir;
+        String sourceDownloadURIStr = segmentURIMap.get(segmentName);
+        if (StringUtils.isEmpty(sourceDownloadURIStr)) {
+          throw new ControllerApplicationException(LOGGER,
+              "'DOWNLOAD_URI' is required as a field within the multipart 
object for METADATA batch upload mode.",
+              Response.Status.BAD_REQUEST);
+        }
+        // The downloadUri for putting into segment zk metadata
+        String segmentDownloadURIStr = sourceDownloadURIStr;
+
+        String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
+        tempEncryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName + ENCRYPTED_SUFFIX);
+        tempFiles.add(tempEncryptedFile);
+        tempDecryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName);
+        tempFiles.add(tempDecryptedFile);
+        tempSegmentDir = new File(provider.getUntarredFileTempDir(), 
tempFileName);
+        tempFiles.add(tempSegmentDir);
+        boolean encryptSegment = 
StringUtils.isNotEmpty(crypterClassNameInHeader);
+        File destFile = encryptSegment ? tempEncryptedFile : tempDecryptedFile;
+        // override copySegmentToFinalLocation if override provided in 
headers:COPY_SEGMENT_TO_DEEP_STORE
+        // else set to false for backward compatibility
+        String copySegmentToDeepStore =
+            extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
+        boolean copySegmentToFinalLocation = 
Boolean.parseBoolean(copySegmentToDeepStore);
+        createSegmentFileFromSegmentMetadataInfo(segmentMetadataInfo, 
destFile);
+        // TODO: Include the untarred segment size when using the METADATA 
push rest API. Currently we can only use the
+        //  tarred segment size as an approximation.
+        long segmentSizeInBytes = getSegmentSizeFromFile(sourceDownloadURIStr);
+
+        if (encryptSegment) {
+          decryptFile(crypterClassNameInHeader, tempEncryptedFile, 
tempDecryptedFile);
+        }
+
+        String metadataProviderClass = 
DefaultMetadataExtractor.class.getName();
+        SegmentMetadata segmentMetadata = 
getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass);
+        LOGGER.info("Processing upload request for segment: {} of table: {} 
with upload type: {} from client: {}, "
+                + "ingestion descriptor: {}", segmentName, tableNameWithType, 
uploadType, clientAddress,
+            ingestionDescriptor);
+
+        // Validate segment
+        if (tableConfig.getIngestionConfig() == null || 
tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
+          SegmentValidationUtils.validateTimeInterval(segmentMetadata, 
tableConfig);
+        }
+
+        // Encrypt segment
+        String crypterNameInTableConfig = 
tableConfig.getValidationConfig().getCrypterClassName();
+        Pair<String, File> encryptionInfo =
+            encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, 
encryptSegment, crypterClassNameInHeader,
+                crypterNameInTableConfig, segmentName, tableNameWithType);
+        File segmentFile = encryptionInfo.getRight();
+
+        // Update download URI if controller is responsible for moving the 
segment to the deep store
+        URI finalSegmentLocationURI = null;
+        if (copySegmentToFinalLocation) {
+          URI dataDirURI = provider.getDataDirURI();
+          String dataDirPath = dataDirURI.toString();
+          String encodedSegmentName = URIUtils.encode(segmentName);
+          String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, 
rawTableName, encodedSegmentName);
+          if 
(dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME))
 {
+            segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), 
"segments", rawTableName, encodedSegmentName);
+          } else {
+            segmentDownloadURIStr = finalSegmentLocationPath;
+          }
+          finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
+        }
+        SegmentUploadMetadata segmentUploadMetadata =
+            new SegmentUploadMetadata(segmentDownloadURIStr, 
sourceDownloadURIStr, finalSegmentLocationURI,
+                segmentSizeInBytes, segmentMetadata, encryptionInfo);
+        segmentUploadMetadataList.add(segmentUploadMetadata);
+        LOGGER.info("Using segment download URI: {} for segment: {} of table: 
{} (move segment: {})",
+            segmentDownloadURIStr, segmentFile, tableNameWithType, 
copySegmentToFinalLocation);
+        // complete segment operations for all the segments
+        if (++entryCount == segmentMetadataFileMap.size()) {
+          ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, 
_controllerConf, _controllerMetrics);
+          zkOperator.completeSegmentsOperations(tableNameWithType, uploadType, 
enableParallelPushProtection,
+              allowRefresh, headers, segmentUploadMetadataList);
+        }
+      }
+    } catch (Exception e) {
+      
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR,
+          segmentUploadMetadataList.size());
+      throw new ControllerApplicationException(LOGGER,
+          "Exception while processing segments to upload: " + e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    } finally {
+      cleanupTempFiles(tempFiles);
+    }
+
+    return new SuccessResponse(String.format("Successfully uploaded segments: 
%s of table: %s in %s ms",
+        segmentNames, tableNameWithType, System.currentTimeMillis() - 
segmentsUploadStartTimeMs));
+  }
+
+  private static class SegmentMetadataInfo {
+    private File _segmentCreationMetaFile;
+    private File _segmentPropertiesFile;
+
+    public File getSegmentCreationMetaFile() {
+      return _segmentCreationMetaFile;
+    }
+
+    public File getSegmentPropertiesFile() {
+      return _segmentPropertiesFile;
+    }
+
+    public void setSegmentCreationMetaFile(File file) {
+      _segmentCreationMetaFile = file;
+    }
+
+    public void setSegmentPropertiesFile(File file) {
+      _segmentPropertiesFile = file;
+    }
+  }
+
+  private void cleanupTempFiles(List<File> tempFiles) {
+    for (File tempFile : tempFiles) {
+      FileUtils.deleteQuietly(tempFile);
+    }
+  }
+
   @Nullable
   private String extractHttpHeader(HttpHeaders headers, String name) {
     String value = headers.getHeaderString(name);
@@ -555,6 +798,65 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
   }
 
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/v3/segments")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Cluster.UPLOAD_SEGMENT)
+  @Authenticate(AccessType.CREATE)
+  @ApiOperation(value = "Upload a batch of segments", notes = "Upload a batch 
of segments with METADATA upload type")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+      @ApiResponse(code = 400, message = "Bad Request"),
+      @ApiResponse(code = 403, message = "Segment validation fails"),
+      @ApiResponse(code = 409, message = "Segment already exists or another 
parallel push in progress"),
+      @ApiResponse(code = 410, message = "Segment to refresh does not exist"),
+      @ApiResponse(code = 412, message = "CRC check fails"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  @TrackInflightRequestMetrics
+  @TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
+  // This multipart based endpoint is used to upload a list of segments in 
batch mode.
+  public void uploadSegmentsAsMultiPart(FormDataMultiPart multiPart,
+      @ApiParam(value = "Name of the table", required = true)
+      @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
+      String tableName,
+      @ApiParam(value = "Type of the table", required = true)
+      @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE)
+      String tableType,
+      @ApiParam(value = "Whether to enable parallel push protection")
+      @DefaultValue("false")
+      
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
+      boolean enableParallelPushProtection,
+      @ApiParam(value = "Whether to refresh if the segment already exists")
+      @DefaultValue("true")
+      @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH)
+      boolean allowRefresh,
+      @Context HttpHeaders headers,
+      @Context Request request,
+      @Suspended final AsyncResponse asyncResponse) {
+    if (StringUtils.isEmpty(tableName)) {
+      throw new ControllerApplicationException(LOGGER,
+          "tableName is a required field while uploading segments in batch 
mode.", Response.Status.BAD_REQUEST);
+    }
+    if (StringUtils.isEmpty(tableType)) {
+      throw new ControllerApplicationException(LOGGER,
+          "tableType is a required field while uploading segments in batch 
mode.", Response.Status.BAD_REQUEST);
+    }
+    if (multiPart == null) {
+      throw new ControllerApplicationException(LOGGER,
+          "multiPart is a required field while uploading segments in batch 
mode.", Response.Status.BAD_REQUEST);
+    }
+    try {
+      asyncResponse.resume(
+          uploadSegments(tableName, 
TableType.valueOf(tableType.toUpperCase()), multiPart, 
enableParallelPushProtection,
+              allowRefresh, headers, request));
+    } catch (Throwable t) {
+      asyncResponse.resume(t);
+    }
+  }
+
   @POST
   @ManagedAsync
   @Produces(MediaType.APPLICATION_JSON)
@@ -752,6 +1054,38 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
   }
 
+  @VisibleForTesting
+  static void createSegmentFileFromBodyPart(FormDataBodyPart 
segmentMetadataBodyPart, File destFile)
+      throws IOException {
+    try (InputStream inputStream = 
segmentMetadataBodyPart.getValueAs(InputStream.class);
+        OutputStream outputStream = new FileOutputStream(destFile)) {
+      IOUtils.copyLarge(inputStream, outputStream);
+    } finally {
+      segmentMetadataBodyPart.cleanup();
+    }
+  }
+
+  static void createSegmentFileFromSegmentMetadataInfo(SegmentMetadataInfo 
metadataInfo, File destFile)
+      throws IOException {
+    File creationMetaFile = metadataInfo.getSegmentCreationMetaFile();
+    File metadataPropertiesFile = metadataInfo.getSegmentPropertiesFile();
+    String uuid = UUID.randomUUID().toString();
+    File segmentMetadataDir = new File(FileUtils.getTempDirectory(), 
"segmentMetadataDir-" + uuid);
+    FileUtils.copyFile(creationMetaFile, new File(segmentMetadataDir, 
"creation.meta"));
+    FileUtils.copyFile(metadataPropertiesFile, new File(segmentMetadataDir, 
"metadata.properties"));
+    File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(), 
"segmentMetadataTar-" + uuid
+        + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    if (segmentMetadataTarFile.exists()) {
+      FileUtils.forceDelete(segmentMetadataTarFile);
+    }
+    TarGzCompressionUtils.createTarGzFile(segmentMetadataDir, 
segmentMetadataTarFile);
+    try {
+      FileUtils.copyFile(segmentMetadataTarFile, destFile);
+    } finally {
+      FileUtils.forceDelete(segmentMetadataTarFile);
+    }
+  }
+
   private FileUploadType getUploadType(String uploadTypeStr) {
     if (uploadTypeStr != null) {
       return FileUploadType.valueOf(uploadTypeStr);
@@ -760,6 +1094,25 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
   }
 
+  @VisibleForTesting
+  long getSegmentSizeFromFile(String sourceDownloadURIStr)
+      throws IOException {
+    long segmentSizeInBytes = -1;
+    PinotFS pinotFS = null;
+    try {
+      URI segmentURI = new URI(sourceDownloadURIStr);
+      pinotFS = PinotFSFactory.create(segmentURI.getScheme());
+      segmentSizeInBytes = pinotFS.length(segmentURI);
+    } catch (Exception e) {
+      LOGGER.warn(String.format("Exception while segment size for uri: %s", 
sourceDownloadURIStr), e);
+    } finally {
+      if (pinotFS != null) {
+        pinotFS.close();
+      }
+    }
+    return segmentSizeInBytes;
+  }
+
   // Validate that there is one file that is in the input.
   public static boolean validateMultiPart(Map<String, List<FormDataBodyPart>> 
map, String segmentName) {
     boolean isGood = true;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java
new file mode 100644
index 0000000000..d46891572b
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.upload;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Objects;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+
+
+/**
+ * Data object used while adding or updating segments. It's comprised of the 
following fields:
+ * <ol>
+ *   <li>segmentDownloadURIStr – The segment download URI persisted into the 
ZK metadata.</li>
+ *   <li>sourceDownloadURIStr – The URI from where the segment could be 
downloaded.</li>
+ *   <li>finalSegmentLocationURI – The final location of the segment in the 
deep-store.</li>
+ *   <li>segmentSizeInBytes – The segment size in bytes.</li>
+ *   <li>segmentMetadata – The segment metadata as defined in {@link 
org.apache.pinot.segment.spi.SegmentMetadata}.</li>
+ *   <li>encryptionInfo – A pair consisting of the crypter class used to 
encrypt the segment, and the encrypted segment
+ *   file.</li>
+ *   <li>segmentMetadataZNRecord – The segment metadata represented as a helix
+ *   {@link org.apache.helix.zookeeper.datamodel.ZNRecord}.</li>
+ * </ol>
+ */
+public class SegmentUploadMetadata {
+  private final String _segmentDownloadURIStr;
+  private final String _sourceDownloadURIStr;
+  private final URI _finalSegmentLocationURI;
+  private final Long _segmentSizeInBytes;
+  private final SegmentMetadata _segmentMetadata;
+  private final Pair<String, File> _encryptionInfo;
+  private ZNRecord _segmentMetadataZNRecord;
+
+  public SegmentUploadMetadata(String segmentDownloadURIStr, String 
sourceDownloadURIStr, URI finalSegmentLocationURI,
+      Long segmentSizeInBytes, SegmentMetadata segmentMetadata, Pair<String, 
File> encryptionInfo) {
+    _segmentDownloadURIStr = segmentDownloadURIStr;
+    _sourceDownloadURIStr = sourceDownloadURIStr;
+    _segmentSizeInBytes = segmentSizeInBytes;
+    _segmentMetadata = segmentMetadata;
+    _encryptionInfo = encryptionInfo;
+    _finalSegmentLocationURI = finalSegmentLocationURI;
+  }
+
+  public String getSegmentDownloadURIStr() {
+    return _segmentDownloadURIStr;
+  }
+
+  public String getSourceDownloadURIStr() {
+    return _sourceDownloadURIStr;
+  }
+
+  public URI getFinalSegmentLocationURI() {
+    return _finalSegmentLocationURI;
+  }
+
+  public Long getSegmentSizeInBytes() {
+    return _segmentSizeInBytes;
+  }
+
+  public SegmentMetadata getSegmentMetadata() {
+    return _segmentMetadata;
+  }
+
+  public Pair<String, File> getEncryptionInfo() {
+    return _encryptionInfo;
+  }
+
+  public void setSegmentMetadataZNRecord(ZNRecord segmentMetadataZNRecord) {
+    _segmentMetadataZNRecord = segmentMetadataZNRecord;
+  }
+
+  public ZNRecord getSegmentMetadataZNRecord() {
+    return _segmentMetadataZNRecord;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SegmentUploadMetadata that = (SegmentUploadMetadata) o;
+    return Objects.equals(_segmentDownloadURIStr, that._segmentDownloadURIStr)
+        && Objects.equals(_sourceDownloadURIStr, that._sourceDownloadURIStr)
+        && Objects.equals(_finalSegmentLocationURI, 
that._finalSegmentLocationURI)
+        && Objects.equals(_segmentSizeInBytes, that._segmentSizeInBytes)
+        && Objects.equals(_segmentMetadata, that._segmentMetadata)
+        && Objects.equals(_encryptionInfo, that._encryptionInfo)
+        && Objects.equals(_segmentMetadataZNRecord, 
that._segmentMetadataZNRecord);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(_segmentDownloadURIStr, _sourceDownloadURIStr, 
_finalSegmentLocationURI,
+        _segmentSizeInBytes, _segmentMetadata, _encryptionInfo, 
_segmentMetadataZNRecord);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index b0aee83d0d..32249320b9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -21,9 +21,14 @@ package org.apache.pinot.controller.api.upload;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -110,6 +115,61 @@ public class ZKOperator {
     }
   }
 
+  // Complete segment operations for a list of segments in batch mode
+  public void completeSegmentsOperations(String tableNameWithType, 
FileUploadType uploadType,
+      boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders 
headers,
+      List<SegmentUploadMetadata> segmentUploadMetadataList)
+      throws Exception {
+    boolean refreshOnly =
+        
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
+    List<SegmentUploadMetadata> newSegmentsList = new ArrayList<>();
+    List<SegmentUploadMetadata> existingSegmentsList = new ArrayList<>();
+    for (SegmentUploadMetadata segmentUploadMetadata: 
segmentUploadMetadataList) {
+      SegmentMetadata segmentMetadata = 
segmentUploadMetadata.getSegmentMetadata();
+      String segmentName = segmentMetadata.getName();
+
+      ZNRecord existingSegmentMetadataZNRecord =
+          
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
+      if (existingSegmentMetadataZNRecord != null && 
shouldProcessAsNewSegment(tableNameWithType, segmentName,
+          existingSegmentMetadataZNRecord, enableParallelPushProtection)) {
+        LOGGER.warn("Removing segment ZK metadata (recovering from previous 
upload failure) for table: {}, segment: {}",
+            tableNameWithType, segmentName);
+        
Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType,
 segmentName),
+            "Failed to remove segment ZK metadata for table: %s, segment: %s", 
tableNameWithType, segmentName);
+        existingSegmentMetadataZNRecord = null;
+      }
+
+      if (existingSegmentMetadataZNRecord == null) {
+        // Add a new segment
+        if (refreshOnly) {
+          throw new ControllerApplicationException(LOGGER,
+              String.format("Cannot refresh non-existing segment: %s for 
table: %s", segmentName, tableNameWithType),
+              Response.Status.GONE);
+        }
+        LOGGER.info("Adding new segment: {} to table: {}", segmentName, 
tableNameWithType);
+        newSegmentsList.add(segmentUploadMetadata);
+      } else {
+        // Refresh an existing segment
+        if (!allowRefresh) {
+          // We cannot perform this check up-front in UploadSegment API call. 
If a segment doesn't exist during the
+          // check done up-front but ends up getting created before the check 
here, we could incorrectly refresh an
+          // existing segment.
+          throw new ControllerApplicationException(LOGGER,
+              String.format("Segment: %s already exists in table: %s. Refresh 
not permitted.", segmentName,
+                  tableNameWithType), Response.Status.CONFLICT);
+        }
+        LOGGER.info("Segment: {} already exists in table: {}, refreshing it", 
segmentName, tableNameWithType);
+        
segmentUploadMetadata.setSegmentMetadataZNRecord(existingSegmentMetadataZNRecord);
+        existingSegmentsList.add(segmentUploadMetadata);
+      }
+    }
+    // process new segments
+    processNewSegments(tableNameWithType, uploadType, 
enableParallelPushProtection, headers, newSegmentsList);
+
+    // process existing segments
+    processExistingSegments(tableNameWithType, uploadType, 
enableParallelPushProtection, headers, existingSegmentsList);
+  }
+
   /**
    * Returns {@code true} when the segment should be processed as new segment.
    * <p>When segment ZK metadata exists, check if segment exists in the ideal 
state. If the previous upload failed after
@@ -276,6 +336,144 @@ public class ZKOperator {
     }
   }
 
+  // process a batch of existing segments
+  private void processExistingSegments(String tableNameWithType, 
FileUploadType uploadType,
+      boolean enableParallelPushProtection, HttpHeaders headers, 
List<SegmentUploadMetadata> segmentUploadMetadataList)
+      throws Exception {
+    for (SegmentUploadMetadata segmentUploadMetadata: 
segmentUploadMetadataList) {
+      SegmentMetadata segmentMetadata = 
segmentUploadMetadata.getSegmentMetadata();
+      String segmentDownloadURIStr = 
segmentUploadMetadata.getSegmentDownloadURIStr();
+      String sourceDownloadURIStr = 
segmentUploadMetadata.getSourceDownloadURIStr();
+      URI finalSegmentLocationURI = 
segmentUploadMetadata.getFinalSegmentLocationURI();
+      Pair<String, File> encryptionInfo = 
segmentUploadMetadata.getEncryptionInfo();
+      String crypterName = encryptionInfo.getLeft();
+      File segmentFile = encryptionInfo.getRight();
+      String segmentName = segmentMetadata.getName();
+      ZNRecord existingSegmentMetadataZNRecord = 
segmentUploadMetadata.getSegmentMetadataZNRecord();
+      long segmentSizeInBytes = segmentUploadMetadata.getSegmentSizeInBytes();
+      int expectedVersion = existingSegmentMetadataZNRecord.getVersion();
+
+      // Check if CRC match when IF-MATCH header is set
+      SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(existingSegmentMetadataZNRecord);
+      long existingCrc = segmentZKMetadata.getCrc();
+      checkCRC(headers, tableNameWithType, segmentName, existingCrc);
+
+      // Check segment upload start time when parallel push protection enabled
+      if (enableParallelPushProtection) {
+        // When segment upload start time is larger than 0, that means another 
upload is in progress
+        long segmentUploadStartTime = 
segmentZKMetadata.getSegmentUploadStartTime();
+        if (segmentUploadStartTime > 0) {
+          handleParallelPush(tableNameWithType, segmentName, 
segmentUploadStartTime);
+        }
+
+        // Lock the segment by setting the upload start time in ZK
+        
segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
+        if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
+          throw new ControllerApplicationException(LOGGER,
+              String.format("Failed to lock the segment: %s of table: %s, 
retry later", segmentName, tableNameWithType),
+              Response.Status.CONFLICT);
+        } else {
+          // The version will increment if the zk metadata update is successful
+          expectedVersion++;
+        }
+      }
+
+      // Reset segment upload start time to unlock the segment later
+      // NOTE: reset this value even if parallel push protection is not 
enabled so that segment can recover in case
+      // previous segment upload did not finish properly and the parallel push 
protection is turned off
+      segmentZKMetadata.setSegmentUploadStartTime(-1);
+
+      try {
+        // Construct the segment ZK metadata custom map modifier
+        String customMapModifierStr =
+            
headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER);
+        SegmentZKMetadataCustomMapModifier customMapModifier =
+            customMapModifierStr != null ? new 
SegmentZKMetadataCustomMapModifier(customMapModifierStr) : null;
+
+        // Update ZK metadata and refresh the segment if necessary
+        long newCrc = Long.parseLong(segmentMetadata.getCrc());
+        if (newCrc == existingCrc) {
+          LOGGER.info(
+              "New segment crc '{}' is the same as existing segment crc for 
segment '{}'. Updating ZK metadata without "
+                  + "refreshing the segment.", newCrc, segmentName);
+          // NOTE: Even though we don't need to refresh the segment, we should 
still update the following fields:
+          // - Creation time (not included in the crc)
+          // - Refresh time
+          // - Custom map
+          
segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+          segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+          if (customMapModifier != null) {
+            
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
+          } else {
+            // If no modifier is provided, use the custom map from the segment 
metadata
+            segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap());
+          }
+          if 
(!segmentZKMetadata.getDownloadUrl().equals(segmentDownloadURIStr)) {
+            // For offline ingestion, it is quite common that the download.uri 
would change but the crc would be the
+            // same. E.g. a user re-runs the job which process the same data 
and segments are stored/pushed from a
+            // different path from the Deepstore. Read more: 
https://github.com/apache/pinot/issues/11535
+            LOGGER.info("Updating segment download url from: {} to: {} even 
though crc is the same",
+                segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr);
+            segmentZKMetadata.setDownloadUrl(segmentDownloadURIStr);
+            // When download URI changes, we also need to copy the segment to 
the final location if existed.
+            // This typically means users changed the push type from METADATA 
to SEGMENT or SEGMENT to METADATA.
+            // Note that switching push type from SEGMENT to METADATA may lead 
orphan segments in the controller
+            // managed directory. Read more: 
https://github.com/apache/pinot/pull/11720
+            if (finalSegmentLocationURI != null) {
+              copySegmentToDeepStore(tableNameWithType, segmentName, 
uploadType, segmentFile, sourceDownloadURIStr,
+                  finalSegmentLocationURI);
+            }
+          }
+          if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
+            throw new RuntimeException(
+                String.format("Failed to update ZK metadata for segment: %s, 
table: %s, expected version: %d",
+                    segmentName, tableNameWithType, expectedVersion));
+          }
+        } else {
+          // New segment is different with the existing one, update ZK 
metadata and refresh the segment
+          LOGGER.info(
+              "New segment crc {} is different than the existing segment crc 
{}. Updating ZK metadata and refreshing "
+                  + "segment {}", newCrc, existingCrc, segmentName);
+          if (finalSegmentLocationURI != null) {
+            copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, 
segmentFile, sourceDownloadURIStr,
+                finalSegmentLocationURI);
+          }
+
+          // NOTE: Must first set the segment ZK metadata before trying to 
refresh because servers and brokers rely on
+          // segment ZK metadata to refresh the segment (server will compare 
the segment ZK metadata with the local
+          // metadata to decide whether to download the new segment; broker 
will update the segment partition info &
+          // time boundary based on the segment ZK metadata)
+          if (customMapModifier == null) {
+            // If no modifier is provided, use the custom map from the segment 
metadata
+            segmentZKMetadata.setCustomMap(null);
+            ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, 
segmentZKMetadata, segmentMetadata,
+                segmentDownloadURIStr, crypterName, segmentSizeInBytes);
+          } else {
+            // If modifier is provided, first set the custom map from the 
segment metadata, then apply the modifier
+            ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, 
segmentZKMetadata, segmentMetadata,
+                segmentDownloadURIStr, crypterName, segmentSizeInBytes);
+            
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
+          }
+          if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
+            throw new RuntimeException(
+                String.format("Failed to update ZK metadata for segment: %s, 
table: %s, expected version: %d",
+                    segmentName, tableNameWithType, expectedVersion));
+          }
+          LOGGER.info("Updated segment: {} of table: {} to property store", 
segmentName, tableNameWithType);
+
+          // Send a message to servers and brokers hosting the table to 
refresh the segment
+          
_pinotHelixResourceManager.sendSegmentRefreshMessage(tableNameWithType, 
segmentName, true, true);
+        }
+      } catch (Exception e) {
+        if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
+          LOGGER.error("Failed to update ZK metadata for segment: {}, table: 
{}, expected version: {}", segmentName,
+              tableNameWithType, expectedVersion);
+        }
+        throw e;
+      }
+    }
+  }
+
   private void checkCRC(HttpHeaders headers, String tableNameWithType, String 
segmentName, long existingCrc) {
     String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH);
     if (expectedCrcStr != null) {
@@ -374,6 +572,102 @@ public class ZKOperator {
     }
   }
 
+  // process a batch of new segments
+  private void processNewSegments(String tableNameWithType, FileUploadType 
uploadType,
+      boolean enableParallelPushProtection, HttpHeaders headers, 
List<SegmentUploadMetadata> segmentUploadMetadataList)
+      throws Exception {
+    Map<String, SegmentZKMetadata> segmentZKMetadataMap = new HashMap<>();
+    List<String> segmentNames = new ArrayList<>();
+    long segmentUploadStartTime = System.currentTimeMillis();
+    for (SegmentUploadMetadata segmentUploadMetadata: 
segmentUploadMetadataList) {
+      SegmentMetadata segmentMetadata = 
segmentUploadMetadata.getSegmentMetadata();
+      String segmentName = segmentMetadata.getName();
+      SegmentZKMetadata newSegmentZKMetadata;
+      URI finalSegmentLocationURI = 
segmentUploadMetadata.getFinalSegmentLocationURI();
+      String segmentDownloadURIStr = 
segmentUploadMetadata.getSegmentDownloadURIStr();
+      String sourceDownloadURIStr = 
segmentUploadMetadata.getSourceDownloadURIStr();
+      String crypterName = segmentUploadMetadata.getEncryptionInfo().getLeft();
+      long segmentSizeInBytes = segmentUploadMetadata.getSegmentSizeInBytes();
+      File segmentFile = segmentUploadMetadata.getEncryptionInfo().getRight();
+      try {
+        newSegmentZKMetadata = 
ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata,
+            segmentDownloadURIStr, crypterName, segmentSizeInBytes);
+        segmentZKMetadataMap.put(segmentName, newSegmentZKMetadata);
+        segmentNames.add(segmentName);
+      } catch (IllegalArgumentException e) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Got invalid segment metadata when adding segment: 
%s for table: %s, reason: %s", segmentName,
+                tableNameWithType, e.getMessage()), 
Response.Status.BAD_REQUEST);
+      }
+
+      // Lock if enableParallelPushProtection is true.
+      if (enableParallelPushProtection) {
+        newSegmentZKMetadata.setSegmentUploadStartTime(segmentUploadStartTime);
+      }
+
+      // Update zk metadata custom map
+      String segmentZKMetadataCustomMapModifierStr = headers != null ? 
headers.getHeaderString(
+          
FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER) 
: null;
+      if (segmentZKMetadataCustomMapModifierStr != null) {
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier 
= new SegmentZKMetadataCustomMapModifier(
+            segmentZKMetadataCustomMapModifierStr);
+        
newSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(
+            newSegmentZKMetadata.getCustomMap()));
+      }
+      if 
(!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, 
newSegmentZKMetadata)) {
+        throw new RuntimeException(String.format("Failed to create ZK metadata 
for segment: %s of table: %s",
+            segmentName, tableNameWithType));
+      }
+
+      if (finalSegmentLocationURI != null) {
+        try {
+          copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, 
segmentFile, sourceDownloadURIStr,
+              finalSegmentLocationURI);
+        } catch (Exception e) {
+          // Cleanup the Zk entry and the segment from the permanent directory 
if it exists.
+          LOGGER.error("Could not move segment {} from table {} to permanent 
directory",
+              segmentName, tableNameWithType, e);
+          // Delete all segments that are getting processed as we are in batch 
mode
+          deleteSegmentsIfNeeded(tableNameWithType, segmentNames, 
segmentUploadStartTime, enableParallelPushProtection);
+          throw e;
+        }
+      }
+    }
+
+    try {
+      _pinotHelixResourceManager.assignTableSegments(tableNameWithType, 
segmentNames);
+    } catch (Exception e) {
+      // assignTableSegment removes the zk entry.
+      // Call deleteSegment to remove the segment from permanent location if 
needed.
+      LOGGER.error("Caught exception while calling assignTableSegments for 
adding segments: {} to table: {}",
+          segmentZKMetadataMap.keySet(), tableNameWithType, e);
+      deleteSegmentsIfNeeded(tableNameWithType, segmentNames, 
segmentUploadStartTime, enableParallelPushProtection);
+      throw e;
+    }
+
+    for (Map.Entry<String, SegmentZKMetadata> segmentZKMetadataEntry: 
segmentZKMetadataMap.entrySet()) {
+      SegmentZKMetadata newSegmentZKMetadata = 
segmentZKMetadataEntry.getValue();
+      String segmentName = segmentZKMetadataEntry.getKey();
+      if (enableParallelPushProtection) {
+        // Release lock. Expected version will be 0 as we hold a lock and no 
updates could take place meanwhile.
+        newSegmentZKMetadata.setSegmentUploadStartTime(-1);
+        if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
newSegmentZKMetadata, 0)) {
+          // There is a race condition when it took too much time for the 1st 
segment upload to process (due to slow
+          // PinotFS access), which leads to the 2nd attempt of segment 
upload, and the 2nd segment upload succeeded.
+          // In this case, when the 1st upload comes back, it shouldn't 
blindly delete the segment when it failed to
+          // update the zk metadata. Instead, the 1st attempt should validate 
the upload start time one more time.
+          // If the start time doesn't match with the one persisted in zk 
metadata, segment deletion should be skipped.
+          String errorMsg = String.format("Failed to update ZK metadata for 
segment: %s of table: %s", segmentName,
+              tableNameWithType);
+          LOGGER.error(errorMsg);
+          // Delete all segments that are getting processed as we are in batch 
mode
+          deleteSegmentsIfNeeded(tableNameWithType, segmentNames, 
segmentUploadStartTime, true);
+          throw new RuntimeException(errorMsg);
+        }
+      }
+    }
+  }
+
   /**
    * Deletes the segment to be uploaded if either one of the criteria is 
qualified:
    * 1) the uploadStartTime matches with the one persisted in ZK metadata.
@@ -397,6 +691,33 @@ public class ZKOperator {
     }
   }
 
+  /**
+   * Deletes the segments to be uploaded if either one of the criteria is 
qualified:
+   * 1) the uploadStartTime matches with the one persisted in ZK metadata.
+   * 2) enableParallelPushProtection is not enabled.
+   */
+  private void deleteSegmentsIfNeeded(String tableNameWithType, List<String> 
segmentNames,
+      long currentSegmentUploadStartTime, boolean 
enableParallelPushProtection) {
+    List<String> segmentsToDelete = new ArrayList<>();
+    for (String segmentName: segmentNames) {
+      ZNRecord existingSegmentMetadataZNRecord =
+          
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
+      if (existingSegmentMetadataZNRecord == null) {
+        continue;
+      }
+      // Check if the upload start time is set by this thread itself, if yes 
delete the segment.
+      SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(existingSegmentMetadataZNRecord);
+      long existingSegmentUploadStartTime = 
segmentZKMetadata.getSegmentUploadStartTime();
+      LOGGER.info("Parallel push protection is {} for segment: {}.",
+          (enableParallelPushProtection ? "enabled" : "disabled"), 
segmentName);
+      if (!enableParallelPushProtection || currentSegmentUploadStartTime == 
existingSegmentUploadStartTime) {
+        segmentsToDelete.add(segmentName);
+      }
+    }
+    _pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentsToDelete);
+    LOGGER.info("Deleted zk entry and segments {} for table {}.", 
segmentsToDelete, tableNameWithType);
+  }
+
   private void copySegmentToDeepStore(String tableNameWithType, String 
segmentName, FileUploadType uploadType,
       File segmentFile, String sourceDownloadURIStr, URI 
finalSegmentLocationURI)
       throws Exception {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 2b835faaae..8748a021b3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2327,6 +2327,84 @@ public class PinotHelixResourceManager {
     }
   }
 
+  // Assign a list of segments in batch mode
+  public void assignTableSegments(String tableNameWithType, List<String> 
segmentNames) {
+    Map<String, String> segmentZKMetadataPathMap = new HashMap<>();
+    for (String segmentName: segmentNames) {
+      String segmentZKMetadataPath = 
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
+          segmentName);
+      segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath);
+    }
+    // Assign instances for the segment and add it into IdealState
+    try {
+      TableConfig tableConfig = getTableConfig(tableNameWithType);
+      Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: " + tableNameWithType);
+
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+          fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+
+      // Initialize tier information only in case direct tier assignment is 
configured
+      if (_enableTieredSegmentAssignment && 
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        List<Tier> sortedTiers = 
TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(),
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager);
+        for (String segmentName: segmentNames) {
+          // Update segment tier to support direct assignment for multiple 
data directories
+          updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+          InstancePartitions tierInstancePartitions = 
TierConfigUtils.getTieredInstancePartitionsForSegment(
+              tableNameWithType, segmentName, sortedTiers, _helixZkManager);
+          if (tierInstancePartitions != null && 
TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+            // Override instance partitions for offline table
+            LOGGER.info("Overriding with tiered instance partitions: {} for 
segment: {} of table: {}",
+                tierInstancePartitions, segmentName, tableNameWithType);
+            instancePartitionsMap = 
Collections.singletonMap(InstancePartitionsType.OFFLINE, 
tierInstancePartitions);
+          }
+        }
+      }
+
+      SegmentAssignment segmentAssignment =
+          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig, _controllerMetrics);
+      synchronized (getTableUpdaterLock(tableNameWithType)) {
+        long segmentAssignmentStartTs = System.currentTimeMillis();
+        Map<InstancePartitionsType, InstancePartitions> 
finalInstancePartitionsMap = instancePartitionsMap;
+        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
+          assert idealState != null;
+          for (String segmentName: segmentNames) {
+            Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
+            if (currentAssignment.containsKey(segmentName)) {
+              LOGGER.warn("Segment: {} already exists in the IdealState for 
table: {}, do not update", segmentName,
+                  tableNameWithType);
+            } else {
+              List<String> assignedInstances =
+                  segmentAssignment.assignSegment(segmentName, 
currentAssignment, finalInstancePartitionsMap);
+              LOGGER.info("Assigning segment: {} to instances: {} for table: 
{}", segmentName, assignedInstances,
+                  tableNameWithType);
+              currentAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
+                  SegmentStateModel.ONLINE));
+            }
+          }
+          return idealState;
+        });
+        LOGGER.info("Added segments: {} to IdealState for table: {} in {} ms", 
segmentNames, tableNameWithType,
+            System.currentTimeMillis() - segmentAssignmentStartTs);
+      }
+    } catch (Exception e) {
+      LOGGER.error(
+          "Caught exception while adding segments: {} to IdealState for table: 
{}, deleting segments ZK metadata",
+          segmentNames, tableNameWithType, e);
+      for (Map.Entry<String, String> segmentZKMetadataPathEntry: 
segmentZKMetadataPathMap.entrySet()) {
+        String segmentName = segmentZKMetadataPathEntry.getKey();
+        String segmentZKMetadataPath = segmentZKMetadataPathEntry.getValue();
+        if (_propertyStore.remove(segmentZKMetadataPath, 
AccessOption.PERSISTENT)) {
+          LOGGER.info("Deleted segment ZK metadata for segment: {} of table: 
{}", segmentName, tableNameWithType);
+        } else {
+          LOGGER.error("Failed to delete segment ZK metadata for segment: {} 
of table: {}", segmentName,
+              tableNameWithType);
+        }
+      }
+      throw e;
+    }
+  }
+
   private Map<InstancePartitionsType, InstancePartitions> 
fetchOrComputeInstancePartitions(String tableNameWithType,
       TableConfig tableConfig) {
     if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/segupload/SegmentUploadIntegrationTest.java
similarity index 78%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/segupload/SegmentUploadIntegrationTest.java
index 35fdae8448..4729cd282f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/segupload/SegmentUploadIntegrationTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.segupload;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Function;
@@ -27,7 +27,10 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.BaseSegmentPushJobRunner;
 import 
org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner;
 import 
org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner;
@@ -59,6 +62,7 @@ import org.testng.annotations.Test;
  * todo: add test for URI push
  */
 public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest {
+  private static String _tableNameSuffix;
 
   @Override
   protected Map<String, String> getStreamConfigs() {
@@ -93,6 +97,7 @@ public class SegmentUploadIntegrationTest extends 
BaseClusterIntegrationTest {
   @BeforeMethod
   public void setUpTest()
       throws IOException {
+    _tableNameSuffix = RandomStringUtils.randomAlphabetic(12);
     TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
   }
 
@@ -136,15 +141,15 @@ public class SegmentUploadIntegrationTest extends 
BaseClusterIntegrationTest {
     jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
     jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
     TableSpec tableSpec = new TableSpec();
-    tableSpec.setTableName(DEFAULT_TABLE_NAME);
-    
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME));
+    tableSpec.setTableName(getTableName() + "_OFFLINE");
+    
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName()));
     jobSpec.setTableSpec(tableSpec);
     PinotClusterSpec clusterSpec = new PinotClusterSpec();
     clusterSpec.setControllerURI(getControllerBaseApiUrl());
     jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});
 
     File dataDir = new File(_controllerConfig.getDataDir());
-    File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME);
+    File dataDirSegments = new File(dataDir, getTableName());
 
     // Not present in dataDir, only present in sourceDir
     Assert.assertFalse(dataDirSegments.exists());
@@ -204,6 +209,78 @@ public class SegmentUploadIntegrationTest extends 
BaseClusterIntegrationTest {
     testCountStar(numDocs);
   }
 
+  @Test
+  public void testUploadMultipleSegmentsInBatchModeAndQuery()
+      throws Exception {
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig offlineTableConfig = createOfflineTableConfig();
+    waitForEVToDisappear(offlineTableConfig.getTableName());
+    addTableConfig(offlineTableConfig);
+
+    List<File> avroFiles = getAllAvroFiles();
+    int numSegments = 12;
+
+    // Create the list of segments
+    for (int segNum = 0; segNum < numSegments; segNum++) {
+      ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(segNum % 
12), offlineTableConfig, schema,
+          "_seg" + segNum, _segmentDir, _tarDir);
+    }
+
+    SegmentMetadataPushJobRunner runner = new SegmentMetadataPushJobRunner();
+    SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
+    // enable batch mode
+    pushJobSpec.setBatchMode(true);
+    jobSpec.setPushJobSpec(pushJobSpec);
+    PinotFSSpec fsSpec = new PinotFSSpec();
+    fsSpec.setScheme("file");
+    fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
+    jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
+    jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(getTableName() + "_OFFLINE");
+    
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName()));
+    jobSpec.setTableSpec(tableSpec);
+    PinotClusterSpec clusterSpec = new PinotClusterSpec();
+    clusterSpec.setControllerURI(getControllerBaseApiUrl());
+    jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});
+
+    File dataDir = new File(_controllerConfig.getDataDir());
+    File dataDirSegments = new File(dataDir, getTableName());
+
+    // Not present in dataDir, only present in sourceDir
+    Assert.assertFalse(dataDirSegments.exists());
+    Assert.assertEquals(_tarDir.listFiles().length, numSegments);
+
+    runner.init(jobSpec);
+    runner.run();
+
+    // Segment should be seen in dataDir
+    Assert.assertTrue(dataDirSegments.exists());
+    Assert.assertEquals(dataDirSegments.listFiles().length, numSegments);
+    Assert.assertEquals(_tarDir.listFiles().length, numSegments);
+
+    // test segment loaded
+    JsonNode segmentsList = getSegmentsList();
+    Assert.assertEquals(segmentsList.size(), numSegments);
+    long numDocs = 0;
+    for (JsonNode segmentName: segmentsList) {
+      numDocs += getNumDocs(segmentName.asText());
+    }
+    testCountStar(numDocs);
+
+    // Clear segment and tar dir
+    for (File segment : _segmentDir.listFiles()) {
+      FileUtils.deleteQuietly(segment);
+    }
+    for (File tar : _tarDir.listFiles()) {
+      FileUtils.deleteQuietly(tar);
+    }
+  }
+
   /**
    * Runs both SegmentMetadataPushJobRunner and SegmentTarPushJobRunner while 
enabling consistent data push.
    * Checks that segments are properly loaded and segment lineage entry were 
also in expected states.
@@ -237,15 +314,15 @@ public class SegmentUploadIntegrationTest extends 
BaseClusterIntegrationTest {
     jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
     jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
     TableSpec tableSpec = new TableSpec();
-    tableSpec.setTableName(DEFAULT_TABLE_NAME);
-    
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME));
+    tableSpec.setTableName(getTableName() + "_OFFLINE");
+    
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName()));
     jobSpec.setTableSpec(tableSpec);
     PinotClusterSpec clusterSpec = new PinotClusterSpec();
     clusterSpec.setControllerURI(getControllerBaseApiUrl());
     jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});
 
     File dataDir = new File(_controllerConfig.getDataDir());
-    File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME);
+    File dataDirSegments = new File(dataDir, getTableName());
 
     Assert.assertEquals(_tarDir.listFiles().length, 1);
 
@@ -268,7 +345,7 @@ public class SegmentUploadIntegrationTest extends 
BaseClusterIntegrationTest {
     // Fetch segment lineage entry after running segment metadata push with 
consistent push enabled.
     String segmentLineageResponse = ControllerTest.sendGetRequest(
         ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl())
-            .forListAllSegmentLineages(DEFAULT_TABLE_NAME, 
TableType.OFFLINE.toString()));
+            .forListAllSegmentLineages(getTableName(), 
TableType.OFFLINE.toString()));
     // Segment lineage should be in completed state.
     
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\""));
     // SegmentsFrom should be empty as we started with a blank table.
@@ -317,7 +394,7 @@ public class SegmentUploadIntegrationTest extends 
BaseClusterIntegrationTest {
     // Fetch segment lineage entry after running segment tar push with 
consistent push enabled.
     segmentLineageResponse = ControllerTest.sendGetRequest(
         ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl())
-            .forListAllSegmentLineages(DEFAULT_TABLE_NAME, 
TableType.OFFLINE.toString()));
+            .forListAllSegmentLineages(getTableName(), 
TableType.OFFLINE.toString()));
     // Segment lineage should be in completed state.
     
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\""));
     // SegmentsFrom should contain the previous segment
@@ -337,14 +414,14 @@ public class SegmentUploadIntegrationTest extends 
BaseClusterIntegrationTest {
   private long getNumDocs(String segmentName)
       throws IOException {
     return JsonUtils.stringToJsonNode(
-            
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(DEFAULT_TABLE_NAME,
 segmentName)))
+            
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(), 
segmentName)))
         .get("segment.total.docs").asLong();
   }
 
   private JsonNode getSegmentsList()
       throws IOException {
     return JsonUtils.stringToJsonNode(sendGetRequest(
-            _controllerRequestURLBuilder.forSegmentListAPI(DEFAULT_TABLE_NAME, 
TableType.OFFLINE.toString())))
+            _controllerRequestURLBuilder.forSegmentListAPI(getTableName(), 
TableType.OFFLINE.toString())))
         .get(0).get("OFFLINE");
   }
 
@@ -362,6 +439,11 @@ public class SegmentUploadIntegrationTest extends 
BaseClusterIntegrationTest {
     }, 100L, 300_000, "Failed to load " + countStarResult + " documents", 
true);
   }
 
+  @Override
+  public String getTableName() {
+    return DEFAULT_TABLE_NAME + _tableNameSuffix;
+  }
+
   @AfterMethod
   public void tearDownTest()
       throws IOException {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 8b0963578e..d997f8f0cc 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -179,14 +180,14 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     _pinotTaskConfig = pinotTaskConfig;
     _eventObserver = 
MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
     String taskType = pinotTaskConfig.getTaskType();
-    Map<String, String> configs = pinotTaskConfig.getConfigs();
-    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-    String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
+    Map<String, String> taskConfigs = pinotTaskConfig.getConfigs();
+    String tableNameWithType = taskConfigs.get(MinionConstants.TABLE_NAME_KEY);
+    String inputSegmentNames = 
taskConfigs.get(MinionConstants.SEGMENT_NAME_KEY);
     String[] segmentNames = 
inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR);
-    String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
-    String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
+    String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
+    String downloadURLString = 
taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY);
     String[] downloadURLs = 
downloadURLString.split(MinionConstants.URL_SEPARATOR);
-    AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+    AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN));
     LOGGER.info("Start executing {} on table: {}, input segments: {} with 
downloadURLs: {}, uploadURL: {}", taskType,
         tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
     File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), 
taskType), "tmp-" + UUID.randomUUID());
@@ -274,6 +275,8 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
 
       SegmentUploadContext segmentUploadContext = new 
SegmentUploadContext(pinotTaskConfig, segmentConversionResults);
       preUploadSegments(segmentUploadContext);
+      Map<String, String> segmentUriToTarPathMap = new HashMap<>();
+      PushJobSpec pushJobSpec = getPushJobSpec(taskConfigs);
 
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
@@ -282,51 +285,60 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
         String resultSegmentName = segmentConversionResult.getSegmentName();
         _eventObserver.notifyProgress(_pinotTaskConfig,
             String.format("Uploading segment: %s (%d out of %d)", 
resultSegmentName, (i + 1), numOutputSegments));
-
-        // Set segment ZK metadata custom map modifier into HTTP header to 
modify the segment ZK metadata
-        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
-            getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, 
segmentConversionResult);
-        Header segmentZKMetadataCustomMapModifierHeader =
-            new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
-                segmentZKMetadataCustomMapModifier.toJsonString());
-
-        String pushMode =
-            configs.getOrDefault(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.TAR.name());
+        String pushMode = 
taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
+            BatchConfigProperties.SegmentPushType.TAR.name());
         URI outputSegmentTarURI;
         if 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
             != BatchConfigProperties.SegmentPushType.TAR) {
-          outputSegmentTarURI = moveSegmentToOutputPinotFS(configs, 
convertedTarredSegmentFile);
+          outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, 
convertedTarredSegmentFile);
           LOGGER.info("Moved generated segment from [{}] to location: [{}]", 
convertedTarredSegmentFile,
               outputSegmentTarURI);
         } else {
           outputSegmentTarURI = convertedTarredSegmentFile.toURI();
         }
 
-        List<Header> httpHeaders = new ArrayList<>();
-        httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
-        httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
-
+        // Set segment ZK metadata custom map modifier into HTTP header to 
modify the segment ZK metadata
+        List<Header> httpHeaders = 
getSegmentPushCommonHeaders(pinotTaskConfig, authProvider, 
segmentConversionResults);
         // Set parameters for upload request
-        NameValuePair enableParallelPushProtectionParameter =
-            new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
 "true");
-        NameValuePair tableNameParameter = new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
-            TableNameBuilder.extractRawTableName(tableNameWithType));
-        NameValuePair tableTypeParameter = new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
-            
TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString());
+        List<NameValuePair> parameters = 
getSegmentPushCommonParams(tableNameWithType);
+
         // RealtimeToOfflineSegmentsTask pushed segments to the corresponding 
offline table
         // TODO: This is not clean to put the override here, but let's think 
about it harder to see what is the proper
         //  way to override it.
         if 
(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE.equals(taskType)) {
-          tableTypeParameter = new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
-              TableType.OFFLINE.toString());
+          Iterator<NameValuePair> paramItr = parameters.iterator();
+          while (paramItr.hasNext()) {
+            NameValuePair nameValuePair = paramItr.next();
+            if 
(FileUploadDownloadClient.QueryParameters.TABLE_TYPE.equals(nameValuePair.getName()))
 {
+              paramItr.remove();
+              parameters.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
+                  TableType.OFFLINE.toString()));
+              break;
+            }
+          }
+        }
+
+        if 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
+            == BatchConfigProperties.SegmentPushType.METADATA) {
+          updateSegmentUriToTarPathMap(taskConfigs, outputSegmentTarURI, 
segmentConversionResult,
+              segmentUriToTarPathMap, pushJobSpec);
+        } else {
+          pushSegment(taskConfigs, outputSegmentTarURI, httpHeaders, 
parameters, segmentConversionResult);
+          if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
+            LOGGER.warn("Failed to delete tarred converted segment: {}", 
convertedTarredSegmentFile.getAbsolutePath());
+          }
         }
-        List<NameValuePair> parameters = 
Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter,
-            tableTypeParameter);
+      }
 
-        pushSegment(tableNameParameter.getValue(), configs, 
outputSegmentTarURI, httpHeaders, parameters,
-            segmentConversionResult);
-        if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
-          LOGGER.warn("Failed to delete tarred converted segment: {}", 
convertedTarredSegmentFile.getAbsolutePath());
+      if (!segmentUriToTarPathMap.isEmpty()) {
+        // For metadata push, push all segments in batch mode
+        pushJobSpec.setBatchMode(true);
+        pushSegments(tableNameWithType, taskConfigs, pinotTaskConfig, 
segmentUriToTarPathMap, pushJobSpec, authProvider,
+            segmentConversionResults);
+        for (File convertedTarredSegmentFile: tarredSegmentFiles) {
+          if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
+            LOGGER.warn("Failed to delete tarred converted segment: {}", 
convertedTarredSegmentFile.getAbsolutePath());
+          }
         }
       }
 
@@ -335,9 +347,8 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
       String outputSegmentNames = 
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
           .collect(Collectors.joining(","));
       postProcess(pinotTaskConfig);
-      LOGGER
-          .info("Done executing {} on table: {}, input segments: {}, output 
segments: {}", taskType, tableNameWithType,
-              inputSegmentNames, outputSegmentNames);
+      LOGGER.info("Done executing {} on table: {}, input segments: {}, output 
segments: {}", taskType,
+          tableNameWithType, inputSegmentNames, outputSegmentNames);
 
       return segmentConversionResults;
     } finally {
@@ -345,50 +356,107 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     }
   }
 
-  private void pushSegment(String tableName, Map<String, String> taskConfigs, 
URI outputSegmentTarURI,
-      List<Header> headers, List<NameValuePair> parameters, 
SegmentConversionResult segmentConversionResult)
-      throws Exception {
-    String pushMode =
-        taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.TAR.name());
-    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushMode, outputSegmentTarURI);
+  private void updateSegmentUriToTarPathMap(Map<String, String> taskConfigs, 
URI outputSegmentTarURI,
+      SegmentConversionResult segmentConversionResult, Map<String, String> 
segmentUriToTarPathMap,
+      PushJobSpec pushJobSpec) {
+    String segmentName = segmentConversionResult.getSegmentName();
+    if 
(!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+      throw new RuntimeException(String.format("Output dir URI missing for 
metadata push while processing segment: %s",
+          segmentName));
+    }
+    URI outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    Map<String, String> localSegmentUriToTarPathMap =
+        SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, 
pushJobSpec,
+            new String[]{outputSegmentTarURI.toString()});
+    if (!localSegmentUriToTarPathMap.isEmpty()) {
+      segmentUriToTarPathMap.putAll(localSegmentUriToTarPathMap);
+    }
+  }
 
+  private PushJobSpec getPushJobSpec(Map<String, String> taskConfigs) {
     PushJobSpec pushJobSpec = new PushJobSpec();
     pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
     pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
     pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
     
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
     
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+    return pushJobSpec;
+  }
 
-    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, 
taskConfigs, pushJobSpec);
-
-    switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
-      case TAR:
-          File tarFile = new File(outputSegmentTarURI);
-          String segmentName = segmentConversionResult.getSegmentName();
-          String tableNameWithType = 
segmentConversionResult.getTableNameWithType();
-          String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
-          SegmentConversionUtils.uploadSegment(taskConfigs, headers, 
parameters, tableNameWithType, segmentName,
-              uploadURL, tarFile);
-        break;
-      case METADATA:
-        if 
(taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
-          URI outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
-          try (PinotFS outputFileFS = 
MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
-            Map<String, String> segmentUriToTarPathMap =
-                
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
-                    new String[]{outputSegmentTarURI.toString()});
-            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, 
segmentUriToTarPathMap, headers, parameters);
-          }
-        } else {
-          throw new RuntimeException("Output dir URI missing for metadata 
push");
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException("Unrecognized push mode - " + 
pushMode);
+  private List<Header> getSegmentPushCommonHeaders(PinotTaskConfig 
pinotTaskConfig, AuthProvider authProvider,
+      List<SegmentConversionResult> segmentConversionResults) {
+    SegmentConversionResult segmentConversionResult;
+    if (segmentConversionResults.size() == 1) {
+      segmentConversionResult = segmentConversionResults.get(0);
+    } else {
+      // Setting to null as the base method expects a single object. This is 
ok for now, since the
+      // segmentConversionResult is not made use of while generating the 
customMap.
+      segmentConversionResult = null;
+    }
+    SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
+        getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, 
segmentConversionResult);
+    Header segmentZKMetadataCustomMapModifierHeader =
+        new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+            segmentZKMetadataCustomMapModifier.toJsonString());
+
+    List<Header> headers = new ArrayList<>();
+    headers.add(segmentZKMetadataCustomMapModifierHeader);
+    headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
+    return headers;
+  }
+
+  private List<NameValuePair> getSegmentPushCommonParams(String 
tableNameWithType) {
+    List<NameValuePair> params = new ArrayList<>();
+    params.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
+        "true"));
+    params.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
+        TableNameBuilder.extractRawTableName(tableNameWithType)));
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    if (tableType != null) {
+      params.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, 
tableType.toString()));
+    } else {
+      throw new RuntimeException(String.format("Failed to determine the 
tableType from name: %s", tableNameWithType));
+    }
+    return params;
+  }
+
+  private void pushSegments(String tableNameWithType, Map<String, String> 
taskConfigs, PinotTaskConfig pinotTaskConfig,
+      Map<String, String> segmentUriToTarPathMap, PushJobSpec pushJobSpec,
+      AuthProvider authProvider, List<SegmentConversionResult> 
segmentConversionResults)
+      throws Exception {
+    String tableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    SegmentGenerationJobSpec spec = 
generateSegmentGenerationJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    List<Header> headers = getSegmentPushCommonHeaders(pinotTaskConfig, 
authProvider, segmentConversionResults);
+    List<NameValuePair> parameters = 
getSegmentPushCommonParams(tableNameWithType);
+
+    URI outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, 
outputSegmentDirURI)) {
+      SegmentPushUtils.sendSegmentsUriAndMetadata(spec, outputFileFS, 
segmentUriToTarPathMap, headers, parameters);
+    }
+  }
+
+  private void pushSegment(Map<String, String> taskConfigs, URI 
outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters, 
SegmentConversionResult segmentConversionResult)
+      throws Exception {
+    String pushMode = taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
+        BatchConfigProperties.SegmentPushType.TAR.name());
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushMode, outputSegmentTarURI);
+
+    if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
+        == BatchConfigProperties.SegmentPushType.TAR) {
+      File tarFile = new File(outputSegmentTarURI);
+      String segmentName = segmentConversionResult.getSegmentName();
+      String tableNameWithType = 
segmentConversionResult.getTableNameWithType();
+      String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
+      SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, 
tableNameWithType, segmentName, uploadURL,
+          tarFile);
+    } else {
+      throw new UnsupportedOperationException("Unrecognized push mode: " + 
pushMode);
     }
   }
 
-  private SegmentGenerationJobSpec generatePushJobSpec(String tableName, 
Map<String, String> taskConfigs,
+  private SegmentGenerationJobSpec generateSegmentGenerationJobSpec(String 
tableName, Map<String, String> taskConfigs,
       PushJobSpec pushJobSpec) {
 
     TableSpec tableSpec = new TableSpec();
@@ -416,7 +484,7 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
       if 
(!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) 
&& outputFileFS.exists(
           outputSegmentTarURI)) {
         throw new RuntimeException(String.format("Output file: %s already 
exists. "
-                + "Set 'overwriteOutput' to true to ignore this error", 
outputSegmentTarURI));
+            + "Set 'overwriteOutput' to true to ignore this error", 
outputSegmentTarURI));
       } else {
         outputFileFS.copyFromLocalFile(localSegmentTarFile, 
outputSegmentTarURI);
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index 4a5dd21948..cc3f008771 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -359,6 +359,136 @@ public class SegmentPushUtils implements Serializable {
     }
   }
 
+  public static void sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, 
PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap, List<Header> headers, 
List<NameValuePair> parameters)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    Map<String, File> segmentMetadataFileMap = new HashMap<>();
+    List<String> segmentURIs = new ArrayList<>();
+    LOGGER.info("Start pushing segment metadata: {} to locations: {} for table 
{}", segmentUriToTarPathMap,
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      // segments stored in Pinot deep store do not have .tar.gz extension
+      String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+          ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+      SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+      File segmentMetadataFile;
+      // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the remote
+      // directory. This is to avoid generating a new segment metadata tar gz 
file every time we push a segment,
+      // which requires downloading the entire segment tar gz file.
+
+      URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, 
segmentName);
+      LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
+      if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
+        segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+            "segmentMetadata-" + UUID.randomUUID() + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        if (segmentMetadataFile.exists()) {
+          FileUtils.forceDelete(segmentMetadataFile);
+        }
+        fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
+      } else {
+        segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+      }
+      segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+      segmentURIs.add(segmentName);
+      segmentURIs.add(segmentUriPath);
+    }
+
+    // FIXME: Move this to a separate method
+    String uuid = UUID.randomUUID().toString();
+    File allSegmentsMetadataDir = new File(FileUtils.getTempDirectory(), 
"allSegmentsMetadataDir-" + uuid);
+    FileUtils.forceMkdir(allSegmentsMetadataDir);
+    for (Map.Entry<String, File> segmentMetadataTarFileEntry: 
segmentMetadataFileMap.entrySet()) {
+      String segmentName = segmentMetadataTarFileEntry.getKey();
+      File tarFile = segmentMetadataTarFileEntry.getValue();
+      TarGzCompressionUtils.untarOneFile(tarFile, 
V1Constants.MetadataKeys.METADATA_FILE_NAME,
+          new File(allSegmentsMetadataDir, segmentName + "." + 
V1Constants.MetadataKeys.METADATA_FILE_NAME));
+      TarGzCompressionUtils.untarOneFile(tarFile, 
V1Constants.SEGMENT_CREATION_META,
+          new File(allSegmentsMetadataDir, segmentName + "." + 
V1Constants.SEGMENT_CREATION_META));
+    }
+    File allSegmentsMetadataTarFile = new File(FileUtils.getTempDirectory(), 
"allSegmentsMetadataTar-" + uuid
+        + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    if (allSegmentsMetadataTarFile.exists()) {
+      FileUtils.forceDelete(allSegmentsMetadataTarFile);
+    }
+    // Add a file which contains the download URI of all the segments
+    File segmentsURIFile = new File(allSegmentsMetadataDir, 
"all_segments_metadata");
+    FileUtils.writeLines(segmentsURIFile, segmentURIs);
+
+    TarGzCompressionUtils.createTarGzFile(allSegmentsMetadataDir, 
allSegmentsMetadataTarFile);
+    Map<String, File> allSegmentsMetadataMap = new HashMap<>();
+    allSegmentsMetadataMap.put("allSegments", allSegmentsMetadataTarFile);
+
+    // perform metadata push in batch mode for every cluster
+    try {
+      for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+        URI controllerURI;
+        try {
+          controllerURI = new URI(pinotClusterSpec.getControllerURI());
+        } catch (URISyntaxException e) {
+          throw new RuntimeException("Got invalid controller uri - '" + 
pinotClusterSpec.getControllerURI() + "'");
+        }
+        LOGGER.info("Pushing segments: {} to Pinot cluster: {} for table {}",
+            segmentMetadataFileMap.keySet(), controllerURI, tableName);
+        int attempts = 1;
+        if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushAttempts() > 0) {
+          attempts = spec.getPushJobSpec().getPushAttempts();
+        }
+        long retryWaitMs = 1000L;
+        if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+          retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+        }
+        RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
+          List<Header> reqHttpHeaders = new ArrayList<>(headers);
+          try {
+            addHeaders(spec, reqHttpHeaders);
+            URI segmentUploadURI = getSegmentUploadURI(controllerURI);
+            SimpleHttpResponse response = 
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI,
+                allSegmentsMetadataMap, reqHttpHeaders, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+            LOGGER.info("Response for pushing table {} segments {} to location 
{} - {}: {}", tableName,
+                segmentMetadataFileMap.keySet(), controllerURI, 
response.getStatusCode(), response.getResponse());
+            return true;
+          } catch (HttpErrorStatusException e) {
+            int statusCode = e.getStatusCode();
+            if (statusCode >= 500) {
+              // Temporary exception
+              LOGGER.warn("Caught temporary exception while pushing table: {} 
segments: {} to {}, will retry",
+                  tableName, segmentMetadataFileMap.keySet(), controllerURI, 
e);
+              return false;
+            } else {
+              // Permanent exception
+              LOGGER.error("Caught permanent exception while pushing table: {} 
segments: {} to {}, won't retry",
+                  tableName, segmentMetadataFileMap.keySet(), controllerURI, 
e);
+              throw e;
+            }
+          }
+        });
+      }
+    } finally {
+      for (Map.Entry<String, File> metadataFileEntry: 
segmentMetadataFileMap.entrySet()) {
+        FileUtils.deleteQuietly(metadataFileEntry.getValue());
+      }
+      FileUtils.deleteDirectory(allSegmentsMetadataDir);
+      FileUtils.forceDelete(allSegmentsMetadataTarFile);
+    }
+  }
+
+  private static URI getSegmentUploadURI(URI controllerURI)
+      throws URISyntaxException {
+    return FileUploadDownloadClient.getUploadSegmentBatchURI(controllerURI);
+  }
+
+  private static void addHeaders(SegmentGenerationJobSpec jobSpec, 
List<Header> headers) {
+    headers.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+        FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+    if (jobSpec.getPushJobSpec() != null) {
+      headers.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
+          
String.valueOf(jobSpec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
+    }
+  }
+
   public static Map<String, String> getSegmentUriToTarPathMap(URI 
outputDirURI, PushJobSpec pushSpec,
       String[] files) {
     Map<String, String> segmentUriToTarPathMap = new HashMap<>();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
index 31d1ce8448..2b9237e5fd 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
@@ -46,6 +46,13 @@ public class PushJobSpec implements Serializable {
    * If true, and if segment was not already in the deep store, move it to 
deep store.
    */
   private boolean _copyToDeepStoreForMetadataPush;
+
+  /**
+   * Applicable for METADATA push type.
+   * If true, multiple segment metadata files are uploaded to the controller 
in a single call.
+   */
+  private boolean _batchMode;
+
   /**
    * Used in SegmentUriPushJobRunner, which is used to composite the segment 
uri to send to pinot controller.
    * The URI sends to controller is in the format 
${segmentUriPrefix}${segmentPath}${segmentUriSuffix}
@@ -148,4 +155,12 @@ public class PushJobSpec implements Serializable {
   public void setCopyToDeepStoreForMetadataPush(boolean 
copyToDeepStoreForMetadataPush) {
     _copyToDeepStoreForMetadataPush = copyToDeepStoreForMetadataPush;
   }
+
+  public boolean isBatchMode() {
+    return _batchMode;
+  }
+
+  public void setBatchMode(boolean batchMode) {
+    _batchMode = batchMode;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to