This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 409e5da  SegmentUploader impl (#6740)
409e5da is described below

commit 409e5da344c8ab4077fd318f6a3f801377b1b958
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Thu Apr 8 17:56:44 2021 -0700

    SegmentUploader impl (#6740)
    
    A default implementation for the SegmentUploader.
    This is a followup to #6718. With this PR, the SegmentWriter and 
SegmentUploader will be ready for use by other initiatives.
---
 pinot-controller/pom.xml                           |   5 +
 .../pinot/controller/util/FileIngestionHelper.java |  39 ++++-
 .../api/PinotIngestionRestletResourceTest.java     |  14 +-
 .../pinot/controller/helix/ControllerTest.java     |   5 +
 .../org/apache/pinot/core/util/IngestionUtils.java | 165 +++++++++++++++------
 .../apache/pinot/core/util}/SegmentPushUtils.java  |   2 +-
 .../pinot/core/util}/SegmentPushUtilsTest.java     |   2 +-
 pinot-distribution/pinot-assembly.xml              |   6 +
 pinot-integration-tests/pom.xml                    |   5 +
 .../SegmentWriterUploaderIntegrationTest.java      |  88 ++++++++---
 .../hadoop/HadoopSegmentMetadataPushJobRunner.java |   2 +-
 .../hadoop/HadoopSegmentTarPushJobRunner.java      |   2 +-
 .../hadoop/HadoopSegmentUriPushJobRunner.java      |   2 +-
 .../spark/SparkSegmentMetadataPushJobRunner.java   |   2 +-
 .../batch/spark/SparkSegmentTarPushJobRunner.java  |   2 +-
 .../batch/spark/SparkSegmentUriPushJobRunner.java  |   2 +-
 .../standalone/SegmentMetadataPushJobRunner.java   |   2 +-
 .../batch/standalone/SegmentTarPushJobRunner.java  |   2 +-
 .../batch/standalone/SegmentUriPushJobRunner.java  |   2 +-
 .../SegmentGenerationAndPushTaskExecutor.java      |   3 +-
 .../pinot-segment-uploader-default/pom.xml         |  53 +++++++
 .../segmentuploader/SegmentUploaderDefault.java    | 100 +++++++++++++
 pinot-plugins/pinot-segment-uploader/pom.xml       |  67 +++++++++
 .../filebased/FileBasedSegmentWriter.java          |   2 +-
 pinot-plugins/pom.xml                              |   1 +
 .../org/apache/pinot/spi/auth/AuthContext.java     |  34 +++++
 .../pinot/spi/ingestion/batch/BatchConfig.java     |  52 ++++++-
 .../spi/ingestion/batch/BatchConfigProperties.java |   3 +
 .../segment/uploader/SegmentUploader.java          |  11 +-
 .../pinot/spi/utils/IngestionConfigUtils.java      |  54 ++++++-
 30 files changed, 624 insertions(+), 105 deletions(-)

diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index 52ede83..6d25fd6 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -73,6 +73,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-segment-uploader-default</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-yammer</artifactId>
       <scope>test</scope>
     </dependency>
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 169c41a..73615f4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -30,17 +30,23 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.controller.api.resources.SuccessResponse;
 import org.apache.pinot.core.util.IngestionUtils;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.auth.AuthContext;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.BatchConfig;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
+import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
 import org.slf4j.Logger;
@@ -53,6 +59,7 @@ import org.slf4j.LoggerFactory;
  */
 public class FileIngestionHelper {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FileIngestionHelper.class);
+  private static final String SEGMENT_UPLOADER_CLASS = 
"org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault";
 
   private static final String WORKING_DIR_PREFIX = "working_dir";
   private static final String INPUT_DATA_DIR = "input_data_dir";
@@ -65,7 +72,7 @@ public class FileIngestionHelper {
   private final BatchConfig _batchConfig;
   private final URI _controllerUri;
   private final File _uploadDir;
-  private final String _authToken;
+  private final AuthContext _authContext;
 
   public FileIngestionHelper(TableConfig tableConfig, Schema schema, 
BatchConfig batchConfig, URI controllerUri,
       File uploadDir, String authToken) {
@@ -74,7 +81,7 @@ public class FileIngestionHelper {
     _batchConfig = batchConfig;
     _controllerUri = controllerUri;
     _uploadDir = uploadDir;
-    _authToken = authToken;
+    _authContext = new AuthContext(authToken);
   }
 
   /**
@@ -108,14 +115,25 @@ public class FileIngestionHelper {
         LOGGER.info("Copied multipart payload to local file: {}", 
inputDir.getAbsolutePath());
       }
 
-      // Get SegmentGeneratorConfig
+      // Update batch config map with values for file upload
       Map<String, String> batchConfigMapOverride = new 
HashMap<>(_batchConfig.getBatchConfigMap());
       batchConfigMapOverride.put(BatchConfigProperties.INPUT_DIR_URI, 
inputFile.getAbsolutePath());
       batchConfigMapOverride.put(BatchConfigProperties.OUTPUT_DIR_URI, 
outputDir.getAbsolutePath());
+      batchConfigMapOverride.put(BatchConfigProperties.PUSH_CONTROLLER_URI, 
_controllerUri.toString());
+      String segmentNamePostfixProp = String.format("%s.%s", 
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
+          BatchConfigProperties.SEGMENT_NAME_POSTFIX);
+      if 
(StringUtils.isBlank(batchConfigMapOverride.get(segmentNamePostfixProp))) {
+        // Default segmentNameGenerator is SIMPLE.
+        // Adding this suffix to prevent creating a segment with the same name 
as an existing segment,
+        // if a file with the same time range is received again
+        batchConfigMapOverride.put(segmentNamePostfixProp, 
String.valueOf(System.currentTimeMillis()));
+      }
       BatchIngestionConfig batchIngestionConfigOverride =
           new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
               IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig),
               
IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
+
+      // Get SegmentGeneratorConfig
       SegmentGeneratorConfig segmentGeneratorConfig =
           IngestionUtils.generateSegmentGeneratorConfig(_tableConfig, _schema, 
batchIngestionConfigOverride);
 
@@ -123,13 +141,20 @@ public class FileIngestionHelper {
       String segmentName = IngestionUtils.buildSegment(segmentGeneratorConfig);
       LOGGER.info("Built segment: {}", segmentName);
 
-      // Tar and push segment
+      // Tar segment dir
       File segmentTarFile =
           new File(segmentTarDir, segmentName + 
org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT);
       TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName), 
segmentTarFile);
-      IngestionUtils
-          .uploadSegment(tableNameWithType, 
Lists.newArrayList(segmentTarFile), _controllerUri, _authToken);
-      LOGGER.info("Uploaded tar: {} to {}", segmentTarFile.getAbsolutePath(), 
_controllerUri);
+
+      // Upload segment
+      IngestionConfig ingestionConfigOverride = new 
IngestionConfig(batchIngestionConfigOverride, null, null, null);
+      TableConfig tableConfigOverride =
+          new 
TableConfigBuilder(_tableConfig.getTableType()).setTableName(_tableConfig.getTableName())
+              .setIngestionConfig(ingestionConfigOverride).build();
+      SegmentUploader segmentUploader = 
PluginManager.get().createInstance(SEGMENT_UPLOADER_CLASS);
+      segmentUploader.init(tableConfigOverride);
+      segmentUploader.uploadSegment(segmentTarFile.toURI(), _authContext);
+      LOGGER.info("Uploaded tar: {} to table: {}", 
segmentTarFile.getAbsolutePath(), tableNameWithType);
 
       return new SuccessResponse(
           "Successfully ingested file into table: " + tableNameWithType + " as 
segment: " + segmentName);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java
index a688a00..044347c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java
@@ -22,11 +22,6 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,19 +30,16 @@ import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.entity.mime.MultipartEntity;
 import org.apache.http.entity.mime.MultipartEntityBuilder;
 import org.apache.http.entity.mime.content.FileBody;
-import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -98,8 +90,8 @@ public class PinotIngestionRestletResourceTest extends 
ControllerTest {
 
     // ingest from file
     Map<String, String> batchConfigMap = new HashMap<>();
-    batchConfigMap.put("inputFormat", "csv");
-    batchConfigMap.put("recordReader.prop.delimiter", "|");
+    batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "csv");
+    batchConfigMap.put(String.format("%s.delimiter", 
BatchConfigProperties.RECORD_READER_PROP_PREFIX), "|");
     
sendHttpPost(_controllerRequestURLBuilder.forIngestFromFile(TABLE_NAME_WITH_TYPE,
 batchConfigMap));
     segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE);
     Assert.assertEquals(segments.size(), 1);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 376b75b..478b784 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -531,6 +531,11 @@ public abstract class ControllerTest {
         
_controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)));
   }
 
+  protected void dropAllSegments(String tableName, TableType tableType) throws 
IOException {
+    sendDeleteRequest(
+        _controllerRequestURLBuilder.forSegmentDeleteAllAPI(tableName, 
tableType.toString()));
+  }
+
   protected void reloadOfflineTable(String tableName) throws IOException {
     sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, 
TableType.OFFLINE.name()), null);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
index baaac9c..5328cf9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
@@ -19,21 +19,17 @@
 package org.apache.pinot.core.util;
 
 import com.google.common.base.Preconditions;
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.common.exception.HttpErrorStatusException;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.core.data.function.FunctionEvaluator;
 import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -42,6 +38,7 @@ import 
org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator;
 import 
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
 import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spi.auth.AuthContext;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
@@ -53,16 +50,20 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.BatchConfig;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetriableOperationException;
-import org.apache.pinot.spi.utils.retry.RetryPolicies;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -70,13 +71,7 @@ import org.slf4j.LoggerFactory;
  */
 public final class IngestionUtils {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IngestionUtils.class);
-
-  private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE =
-      BatchConfigProperties.SegmentNameGeneratorType.SIMPLE;
-  private static final long DEFAULT_RETRY_WAIT_MS = 1000L;
-  private static final int DEFAULT_ATTEMPTS = 3;
-  private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+  private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
 
   private IngestionUtils() {
   }
@@ -143,9 +138,6 @@ public final class IngestionUtils {
 
     String rawTableName = 
TableNameBuilder.extractRawTableName(batchConfig.getTableNameWithType());
     String segmentNameGeneratorType = 
batchConfig.getSegmentNameGeneratorType();
-    if (segmentNameGeneratorType == null) {
-      segmentNameGeneratorType = DEFAULT_SEGMENT_NAME_GENERATOR_TYPE;
-    }
     switch (segmentNameGeneratorType) {
       case BatchConfigProperties.SegmentNameGeneratorType.FIXED:
         return new FixedSegmentNameGenerator(batchConfig.getSegmentName());
@@ -185,40 +177,117 @@ public final class IngestionUtils {
   }
 
   /**
-   * Uploads the segment tar files to the provided controller
+   * Uploads the segments from the provided segmentTar URIs to the table, 
using push details from the batchConfig
+   * @param tableNameWithType name of the table to upload the segment
+   * @param batchConfig batchConfig with details about push such as 
controllerURI, pushAttempts, pushParallelism, etc
+   * @param segmentTarURIs list of URI for the segment tar files
+   * @param authContext auth details required to upload the Pinot segment to 
controller
    */
-  public static void uploadSegment(String tableNameWithType, List<File> 
tarFiles, URI controllerUri,
-      final String authToken)
-      throws RetriableOperationException, AttemptsExceededException {
-    for (File tarFile : tarFiles) {
-      String fileName = tarFile.getName();
-      
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
-      String segmentName = fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length());
-
-      RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, 
DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> {
-        try (InputStream inputStream = new FileInputStream(tarFile)) {
-          SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-              
.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), 
segmentName, inputStream,
-                  FileUploadDownloadClient.makeAuthHeader(authToken),
-                  FileUploadDownloadClient.makeTableParam(tableNameWithType),
-                  FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
-          LOGGER.info("Response for pushing table {} segment {} - {}: {}", 
tableNameWithType, segmentName,
-              response.getStatusCode(), response.getResponse());
-          return true;
-        } catch (HttpErrorStatusException e) {
-          int statusCode = e.getStatusCode();
-          if (statusCode >= 500) {
-            LOGGER.warn("Caught temporary exception while pushing table: {} 
segment: {}, will retry", tableNameWithType,
-                segmentName, e);
-            return false;
-          } else {
-            throw e;
+  public static void uploadSegment(String tableNameWithType, BatchConfig 
batchConfig, List<URI> segmentTarURIs,
+      @Nullable AuthContext authContext)
+      throws Exception {
+
+    SegmentGenerationJobSpec segmentUploadSpec = 
generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext);
+
+    List<String> segmentTarURIStrs = 
segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList());
+    String pushMode = batchConfig.getPushMode();
+    switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try {
+          SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS, 
segmentTarURIStrs);
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(String
+              .format("Caught exception while uploading segments. Push mode: 
TAR, segment tars: [%s]",
+                  segmentTarURIStrs), e);
+        }
+        break;
+      case URI:
+        List<String> segmentUris = new ArrayList<>();
+        try {
+          URI outputSegmentDirURI = null;
+          if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) {
+            outputSegmentDirURI = 
URI.create(batchConfig.getOutputSegmentDirURI());
+          }
+          for (URI segmentTarURI : segmentTarURIs) {
+            URI updatedURI = 
SegmentPushUtils.generateSegmentTarURI(outputSegmentDirURI, segmentTarURI,
+                segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(),
+                segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix());
+            segmentUris.add(updatedURI.toString());
           }
+          SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris);
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(String
+              .format("Caught exception while uploading segments. Push mode: 
URI, segment URIs: [%s]", segmentUris), e);
         }
-      });
+        break;
+      case METADATA:
+        try {
+          URI outputSegmentDirURI = null;
+          if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) {
+            outputSegmentDirURI = 
URI.create(batchConfig.getOutputSegmentDirURI());
+          }
+          PinotFS outputFileFS = getOutputPinotFS(batchConfig, 
outputSegmentDirURI);
+          Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+              .getSegmentUriToTarPathMap(outputSegmentDirURI, 
segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(),
+                  segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix(), 
new String[]{segmentTarURIs.toString()});
+          SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec, 
outputFileFS, segmentUriToTarPathMap);
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(String
+              .format("Caught exception while uploading segments. Push mode: 
METADATA, segment URIs: [%s]",
+                  segmentTarURIStrs), e);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("Unrecognized push mode - " + 
pushMode);
     }
   }
 
+  private static SegmentGenerationJobSpec generateSegmentUploadSpec(String 
tableName, BatchConfig batchConfig,
+      @Nullable AuthContext authContext) {
+
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(tableName);
+
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    pinotClusterSpec.setControllerURI(batchConfig.getPushControllerURI());
+    PinotClusterSpec[] pinotClusterSpecs = new 
PinotClusterSpec[]{pinotClusterSpec};
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(batchConfig.getPushAttempts());
+    pushJobSpec.setPushParallelism(batchConfig.getPushParallelism());
+    
pushJobSpec.setPushRetryIntervalMillis(batchConfig.getPushIntervalRetryMillis());
+    pushJobSpec.setSegmentUriPrefix(batchConfig.getPushSegmentURIPrefix());
+    pushJobSpec.setSegmentUriSuffix(batchConfig.getPushSegmentURISuffix());
+
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+    spec.setPushJobSpec(pushJobSpec);
+    spec.setTableSpec(tableSpec);
+    spec.setPinotClusterSpecs(pinotClusterSpecs);
+    if (authContext != null && 
StringUtils.isNotBlank(authContext.getAuthToken())) {
+      spec.setAuthToken(authContext.getAuthToken());
+    }
+    return spec;
+  }
+
+  /**
+   * Creates an instance of the PinotFS using the fileURI and fs properties 
from BatchConfig
+   */
+  public static PinotFS getOutputPinotFS(BatchConfig batchConfig, URI fileURI) 
{
+    String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
+    if (fileURIScheme == null) {
+      fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+    }
+    if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) {
+      registerPinotFS(fileURIScheme, batchConfig.getOutputFsClassName(),
+          
IngestionConfigUtils.getOutputFsProps(batchConfig.getBatchConfigMap()));
+    }
+    return PinotFSFactory.create(fileURIScheme);
+  }
+
+  private static void registerPinotFS(String fileURIScheme, String fsClass, 
PinotConfiguration fsProps) {
+    PinotFSFactory.register(fileURIScheme, fsClass, fsProps);
+  }
+
   /**
    * Extracts all fields required by the {@link 
org.apache.pinot.spi.data.readers.RecordExtractor} from the given TableConfig 
and Schema
    * Fields for ingestion come from 2 places:
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
 b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentPushUtils.java
similarity index 99%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/util/SegmentPushUtils.java
index 820f09f..4c254a6 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentPushUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.plugin.ingestion.batch.common;
+package org.apache.pinot.core.util;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java
 b/pinot-core/src/test/java/org/apache/pinot/core/util/SegmentPushUtilsTest.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java
rename to 
pinot-core/src/test/java/org/apache/pinot/core/util/SegmentPushUtilsTest.java
index ceecc58..19408f4 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/util/SegmentPushUtilsTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.pinot.plugin.ingestion.batch.common;
+package org.apache.pinot.core.util;
 
 import java.net.URI;
 import org.testng.Assert;
diff --git a/pinot-distribution/pinot-assembly.xml 
b/pinot-distribution/pinot-assembly.xml
index 3e1ed79..e117b48 100644
--- a/pinot-distribution/pinot-assembly.xml
+++ b/pinot-distribution/pinot-assembly.xml
@@ -140,6 +140,12 @@
       
<destName>plugins/pinot-segment-writer/pinot-segment-writer-file-based/pinot-segment-writer-file-based-${project.version}-shaded.jar</destName>
     </file>
     <!-- End Include Pinot Segment Writer Plugins-->
+    <!-- Start Include Pinot Segment Uploader Plugins-->
+    <file>
+      
<source>${pinot.root}/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/target/pinot-segment-uploader-default-${project.version}-shaded.jar</source>
+      
<destName>plugins/pinot-segment-uploader/pinot-segment-uploader-default/pinot-segment-uploader-default-${project.version}-shaded.jar</destName>
+    </file>
+    <!-- End Include Pinot Segment Uploader Plugins-->
     <!-- End Include Pinot Plugins-->
   </files>
   <fileSets>
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index 8949e8b..c17bd5f 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -220,6 +220,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-segment-uploader-default</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-yammer</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
index 883b0a6..1811f72 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
@@ -23,12 +23,14 @@ import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
+import org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault;
 import org.apache.pinot.plugin.segmentwriter.filebased.FileBasedSegmentWriter;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -37,6 +39,7 @@ import 
org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
 import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -85,6 +88,7 @@ public class SegmentWriterUploaderIntegrationTest extends 
BaseClusterIntegration
     Map<String, String> batchConfigMap = new HashMap<>();
     batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, 
_tarDir.getAbsolutePath());
     batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
+    batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI, 
_controllerBaseApiUrl);
     return new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), 
null,
         null, null);
   }
@@ -95,7 +99,7 @@ public class SegmentWriterUploaderIntegrationTest extends 
BaseClusterIntegration
    * Checks the number of segments created and total docs from the query
    */
   @Test
-  public void testFileBasedSegmentWriter()
+  public void testFileBasedSegmentWriterAndDefaultUploader()
       throws Exception {
 
     TableConfig offlineTableConfig = createOfflineTableConfig();
@@ -103,6 +107,8 @@ public class SegmentWriterUploaderIntegrationTest extends 
BaseClusterIntegration
 
     SegmentWriter segmentWriter = new FileBasedSegmentWriter();
     segmentWriter.init(offlineTableConfig, _schema);
+    SegmentUploader segmentUploader = new SegmentUploaderDefault();
+    segmentUploader.init(offlineTableConfig);
 
     GenericRow reuse = new GenericRow();
     long totalDocs = 0;
@@ -110,34 +116,36 @@ public class SegmentWriterUploaderIntegrationTest extends 
BaseClusterIntegration
       AvroRecordReader avroRecordReader = new AvroRecordReader();
       avroRecordReader.init(_avroFiles.get(i), null, null);
 
+      long numDocsInSegment = 0;
       while (avroRecordReader.hasNext()) {
         avroRecordReader.next(reuse);
         segmentWriter.collect(reuse);
+        numDocsInSegment++;
         totalDocs++;
       }
-      segmentWriter.flush();
+      // flush to segment
+      URI segmentTarURI = segmentWriter.flush();
+      // upload
+      segmentUploader.uploadSegment(segmentTarURI, null);
+
+      // check num segments
+      Assert.assertEquals(getNumSegments(), i + 1);
+      // check numDocs in latest segment
+      Assert.assertEquals(getNumDocsInLatestSegment(), numDocsInSegment);
+      // check totalDocs in query
+      checkTotalDocsInQuery(totalDocs);
     }
     segmentWriter.close();
 
-    // Manually upload
-    // TODO: once an implementation of SegmentUploader is available, use that 
instead
-    uploadSegments(_tableNameWithType, _tarDir);
+    dropAllSegments(_tableNameWithType, TableType.OFFLINE);
+    checkNumSegments(0);
 
+    // upload all together using dir
+    segmentUploader.uploadSegmentsFromDir(_tarDir.toURI(), null);
     // check num segments
     Assert.assertEquals(getNumSegments(), 3);
-    final long expectedDocs = totalDocs;
-    TestUtils.waitForCondition(new Function<Void, Boolean>() {
-      @Nullable
-      @Override
-      public Boolean apply(@Nullable Void aVoid) {
-        try {
-          return getTotalDocsFromQuery() == expectedDocs;
-        } catch (Exception e) {
-          LOGGER.error("Caught exception when getting totalDocs from query: 
{}", e.getMessage());
-          return null;
-        }
-      }
-    }, 100L, 120_000, "Failed to load " + expectedDocs + " documents", true);
+    // check totalDocs in query
+    checkTotalDocsInQuery(totalDocs);
 
     dropOfflineTable(_tableNameWithType);
   }
@@ -156,6 +164,50 @@ public class SegmentWriterUploaderIntegrationTest extends 
BaseClusterIntegration
     return response.get("resultTable").get("rows").get(0).get(0).asInt();
   }
 
+  private int getNumDocsInLatestSegment()
+      throws IOException {
+    String jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder.
+        forSegmentListAPIWithTableType(_tableNameWithType, 
TableType.OFFLINE.toString()));
+    JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
+    JsonNode segments = array.get(0).get("OFFLINE");
+    String segmentName = segments.get(segments.size() - 1).asText();
+
+    jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder.
+        forSegmentMetadata(_tableNameWithType, segmentName));
+    JsonNode metadata = JsonUtils.stringToJsonNode(jsonOutputStr);
+    return metadata.get("segment.total.docs").asInt();
+  }
+
+  private void checkTotalDocsInQuery(long expectedTotalDocs) {
+    TestUtils.waitForCondition(new Function<Void, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable Void aVoid) {
+        try {
+          return getTotalDocsFromQuery() == expectedTotalDocs;
+        } catch (Exception e) {
+          LOGGER.error("Caught exception when getting totalDocs from query: 
{}", e.getMessage());
+          return null;
+        }
+      }
+    }, 100L, 120_000, "Failed to load " + expectedTotalDocs + " documents", 
true);
+  }
+
+  private void checkNumSegments(int expectedNumSegments) {
+    TestUtils.waitForCondition(new Function<Void, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable Void aVoid) {
+        try {
+          return getNumSegments() == expectedNumSegments;
+        } catch (Exception e) {
+          LOGGER.error("Caught exception when getting num segments: {}", 
e.getMessage());
+          return null;
+        }
+      }
+    }, 100L, 120_000, "Failed to load get num segments", true);
+  }
+
   @AfterClass
   public void tearDown()
       throws Exception {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
index 65ae7c4..0d9d5ac 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
@@ -26,7 +26,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
index 05aa8f9..d29a3f4 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
@@ -25,7 +25,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
index f451c2c..b563d26 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
@@ -26,7 +26,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
index 5b07db4..bb2ca79 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
@@ -26,7 +26,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
index 1757652..b398bee 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
index 824e05a..2926a73 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
index cad8cf6..f2b2e8d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
@@ -24,7 +24,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
index aa1eee4..641fbdf 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
@@ -25,7 +25,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
index 3172dac..d9a0db7 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
@@ -25,7 +25,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
index a1d9a38..313a219 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
@@ -30,9 +30,9 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.core.util.SegmentPushUtils;
 import org.apache.pinot.minion.MinionContext;
 import 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
 import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -118,6 +118,7 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
 
       resultBuilder.setSegmentName(segmentName);
       // Segment push task
+      // TODO: Make this use SegmentUploader
       pushSegment(taskSpec.getTableConfig().getTableName(), taskConfigs, 
outputSegmentTarURI);
       resultBuilder.setSucceed(true);
     } catch (Exception e) {
diff --git 
a/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/pom.xml 
b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/pom.xml
new file mode 100644
index 0000000..7870427
--- /dev/null
+++ 
b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>pinot-segment-uploader</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pinot-segment-uploader-default</artifactId>
+  <name>Pinot Segment Uploader Default</name>
+  <url>https://pinot.apache.org/</url>
+  <properties>
+    <pinot.root>${basedir}/../../..</pinot.root>
+    <phase.prop>package</phase.prop>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+</project>
diff --git 
a/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
 
b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
new file mode 100644
index 0000000..5c71575
--- /dev/null
+++ 
b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.segmentuploader;
+
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.core.util.IngestionUtils;
+import org.apache.pinot.spi.auth.AuthContext;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Default implementation of {@link SegmentUploader} with support for all push 
modes
+ * The configs for push are fetched from batchConfigMaps of tableConfig
+ */
+public class SegmentUploaderDefault implements SegmentUploader {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentUploaderDefault.class);
+
+  private String _tableNameWithType;
+  private BatchConfig _batchConfig;
+
+  @Override
+  public void init(TableConfig tableConfig)
+      throws Exception {
+    _tableNameWithType = tableConfig.getTableName();
+
+    Preconditions.checkState(
+        tableConfig.getIngestionConfig() != null && 
tableConfig.getIngestionConfig().getBatchIngestionConfig() != null
+            && CollectionUtils
+            
.isNotEmpty(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps()),
+        "Must provide ingestionConfig->batchIngestionConfig->batchConfigMaps 
in tableConfig for table: %s",
+        _tableNameWithType);
+    Preconditions
+        
.checkState(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().size()
 == 1,
+            "batchConfigMaps must contain only 1 BatchConfig for table: %s", 
_tableNameWithType);
+    _batchConfig = new BatchConfig(_tableNameWithType,
+        
tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().get(0));
+
+    
Preconditions.checkState(StringUtils.isNotBlank(_batchConfig.getPushControllerURI()),
+        "Must provide: %s in batchConfigs for table: %s", 
BatchConfigProperties.PUSH_CONTROLLER_URI,
+        _tableNameWithType);
+
+    LOGGER.info("Initialized {} for table: {}", 
SegmentUploaderDefault.class.getName(), _tableNameWithType);
+  }
+
+  @Override
+  public void uploadSegment(URI segmentTarURI, @Nullable AuthContext 
authContext)
+      throws Exception {
+    IngestionUtils
+        .uploadSegment(_tableNameWithType, _batchConfig, 
Collections.singletonList(segmentTarURI), authContext);
+    LOGGER.info("Successfully uploaded segment: {} to table: {}", 
segmentTarURI, _tableNameWithType);
+  }
+
+  @Override
+  public void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthContext 
authContext)
+      throws Exception {
+
+    List<URI> segmentTarURIs = new ArrayList<>();
+    PinotFS outputPinotFS = IngestionUtils.getOutputPinotFS(_batchConfig, 
segmentDir);
+    String[] filePaths = outputPinotFS.listFiles(segmentDir, true);
+    for (String filePath : filePaths) {
+      URI uri = URI.create(filePath);
+      if (!outputPinotFS.isDirectory(uri) && 
filePath.endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        segmentTarURIs.add(uri);
+      }
+    }
+    IngestionUtils.uploadSegment(_tableNameWithType, _batchConfig, 
segmentTarURIs, authContext);
+    LOGGER.info("Successfully uploaded segments: {} to table: {}", 
segmentTarURIs, _tableNameWithType);
+  }
+}
diff --git a/pinot-plugins/pinot-segment-uploader/pom.xml 
b/pinot-plugins/pinot-segment-uploader/pom.xml
new file mode 100644
index 0000000..e5e05fb
--- /dev/null
+++ b/pinot-plugins/pinot-segment-uploader/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>pinot-plugins</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <artifactId>pinot-segment-uploader</artifactId>
+  <packaging>pom</packaging>
+  <name>Pinot Segment Uploader</name>
+  <url>https://pinot.apache.org/</url>
+  <properties>
+    <pinot.root>${basedir}/../..</pinot.root>
+    <plugin.type>pinot-segment-uploader</plugin.type>
+  </properties>
+
+  <modules>
+    <module>pinot-segment-uploader-default</module>
+  </modules>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-spi</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+</project>
diff --git 
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
 
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
index abae7c2..e1c090c 100644
--- 
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
+++ 
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
@@ -145,7 +145,7 @@ public class FileBasedSegmentWriter implements 
SegmentWriter {
    * Successful completion of segment will return the segment URI.
    * The buffer will be reset and ready to accept further records via 
<code>collect()</code>
    *
-   * If an exception is throw, the buffer will not be reset
+   * If an exception is thrown, the buffer will not be reset
    * and so, <code>flush()</code> can be invoked repeatedly in a retry loop.
    * If a successful invocation is not achieved,<code>close()</code> followed 
by <code>init</code> will have to be
    * called in order to reset the buffer and resume record writing.
diff --git a/pinot-plugins/pom.xml b/pinot-plugins/pom.xml
index bd5efbc..238e02b 100644
--- a/pinot-plugins/pom.xml
+++ b/pinot-plugins/pom.xml
@@ -47,6 +47,7 @@
     <module>pinot-minion-tasks</module>
     <module>pinot-metrics</module>
     <module>pinot-segment-writer</module>
+    <module>pinot-segment-uploader</module>
   </modules>
 
   <dependencies>
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
new file mode 100644
index 0000000..5a9798c
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.auth;
+
+/**
+ * Container for all auth related info
+ */
+public class AuthContext {
+  private final String _authToken;
+
+  public AuthContext(String authToken) {
+    _authToken = authToken;
+  }
+
+  public String getAuthToken() {
+    return _authToken;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
index aed5eec..4495c85 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
@@ -52,6 +52,15 @@ public class BatchConfig {
   private final boolean _excludeSequenceId;
   private final String _sequenceId;
 
+  private final String _pushMode;
+  private final int _pushAttempts;
+  private final int _pushParallelism;
+  private final long _pushIntervalRetryMillis;
+  private final String _pushSegmentURIPrefix;
+  private final String _pushSegmentURISuffix;
+  private final String _pushControllerURI;
+  private final String _outputSegmentDirURI;
+
   public BatchConfig(String tableNameWithType, Map<String, String> 
batchConfigsMap) {
     _batchConfigMap = batchConfigsMap;
     _tableNameWithType = tableNameWithType;
@@ -78,7 +87,7 @@ public class BatchConfig {
     _recordReaderProps = IngestionConfigUtils
         .extractPropsMatchingPrefix(batchConfigsMap, 
BatchConfigProperties.RECORD_READER_PROP_PREFIX);
 
-    _segmentNameGeneratorType = 
batchConfigsMap.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE);
+    _segmentNameGeneratorType = 
IngestionConfigUtils.getSegmentNameGeneratorType(batchConfigsMap);
     _segmentNameGeneratorConfigs = IngestionConfigUtils
         .extractPropsMatchingPrefix(batchConfigsMap, 
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX);
     Map<String, String> segmentNameGeneratorProps = 
IngestionConfigUtils.getSegmentNameGeneratorProps(batchConfigsMap);
@@ -87,6 +96,15 @@ public class BatchConfig {
     _segmentNamePostfix = 
segmentNameGeneratorProps.get(BatchConfigProperties.SEGMENT_NAME_POSTFIX);
     _excludeSequenceId = 
Boolean.parseBoolean(segmentNameGeneratorProps.get(BatchConfigProperties.EXCLUDE_SEQUENCE_ID));
     _sequenceId = batchConfigsMap.get(BatchConfigProperties.SEQUENCE_ID);
+
+    _pushMode = IngestionConfigUtils.getPushMode(batchConfigsMap);
+    _pushAttempts = IngestionConfigUtils.getPushAttempts(batchConfigsMap);
+    _pushParallelism = 
IngestionConfigUtils.getPushParallelism(batchConfigsMap);
+    _pushIntervalRetryMillis = 
IngestionConfigUtils.getPushRetryIntervalMillis(batchConfigsMap);
+    _pushSegmentURIPrefix = 
batchConfigsMap.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX);
+    _pushSegmentURISuffix = 
batchConfigsMap.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX);
+    _pushControllerURI = 
batchConfigsMap.get(BatchConfigProperties.PUSH_CONTROLLER_URI);
+    _outputSegmentDirURI = 
batchConfigsMap.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI);
   }
 
   public String getTableNameWithType() {
@@ -165,6 +183,38 @@ public class BatchConfig {
     return _sequenceId;
   }
 
+  public String getPushMode() {
+    return _pushMode;
+  }
+
+  public int getPushAttempts() {
+    return _pushAttempts;
+  }
+
+  public int getPushParallelism() {
+    return _pushParallelism;
+  }
+
+  public long getPushIntervalRetryMillis() {
+    return _pushIntervalRetryMillis;
+  }
+
+  public String getPushSegmentURIPrefix() {
+    return _pushSegmentURIPrefix;
+  }
+
+  public String getPushSegmentURISuffix() {
+    return _pushSegmentURISuffix;
+  }
+
+  public String getPushControllerURI() {
+    return _pushControllerURI;
+  }
+
+  public String getOutputSegmentDirURI() {
+    return _outputSegmentDirURI;
+  }
+
   public Map<String, String> getBatchConfigMap() {
     return _batchConfigMap;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index b208814..a82553f 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -50,6 +50,9 @@ public class BatchConfigProperties {
   public static final String OVERWRITE_OUTPUT = "overwriteOutput";
   public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
   public static final String PUSH_MODE = "push.mode";
+  public static final String PUSH_ATTEMPTS = "push.attempts";
+  public static final String PUSH_PARALLELISM = "push.parallelism";
+  public static final String PUSH_RETRY_INTERVAL_MILLIS = 
"push.retry.interval.millis";
   public static final String PUSH_CONTROLLER_URI = "push.controllerUri";
   public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix";
   public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix";
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
index 64d6cfa..995b597 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
@@ -19,7 +19,9 @@
 package org.apache.pinot.spi.ingestion.segment.uploader;
 
 import java.net.URI;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.auth.AuthContext;
 import org.apache.pinot.spi.config.table.TableConfig;
 
 
@@ -39,14 +41,17 @@ public interface SegmentUploader {
   /**
    * Uploads the segment tar file to the cluster
    * @param segmentTarFile URI of segment tar file
+   * @param authContext auth details required to upload pinot segment to 
controller
    */
-  void uploadSegment(URI segmentTarFile)
+  void uploadSegment(URI segmentTarFile, @Nullable AuthContext authContext)
       throws Exception;
 
   /**
-   * Uploads the segments from the segmentDir to the cluster
+   * Uploads the segments from the segmentDir to the cluster.
+   * Looks for segmentTar files recursively, with suffix .tar.gz
    * @param segmentDir URI of directory containing segment tar files
+   * @param authContext auth details required to upload pinot segment to 
controller
    */
-  void uploadSegments(URI segmentDir)
+  void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthContext authContext)
       throws Exception;
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index a1d7530..74f68a9 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
@@ -34,7 +35,12 @@ import 
org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
  */
 public final class IngestionConfigUtils {
   public static final String DOT_SEPARATOR = ".";
+  private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE =
+      BatchConfigProperties.SegmentNameGeneratorType.SIMPLE;
   private static final String DEFAULT_PUSH_MODE = "tar";
+  private static final int DEFAULT_PUSH_ATTEMPTS = 5;
+  private static final int DEFAULT_PUSH_PARALLELISM = 1;
+  private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
 
   /**
    * Fetches the streamConfig from the given realtime table.
@@ -158,11 +164,51 @@ public final class IngestionConfigUtils {
     return props;
   }
 
+  /**
+   * Extracts the segment name generator type from the batchConfigMap, or 
returns default value if not found
+   */
+  public static String getSegmentNameGeneratorType(Map<String, String> 
batchConfigMap) {
+    return batchConfigMap
+        .getOrDefault(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, 
DEFAULT_SEGMENT_NAME_GENERATOR_TYPE);
+  }
+
+  /**
+   * Extracts the push mode from the batchConfigMap, or returns default value 
if not found
+   */
   public static String getPushMode(Map<String, String> batchConfigMap) {
-    String pushMode = batchConfigMap.get(BatchConfigProperties.PUSH_MODE);
-    if (pushMode == null) {
-      pushMode = DEFAULT_PUSH_MODE;
+    return batchConfigMap.getOrDefault(BatchConfigProperties.PUSH_MODE, 
DEFAULT_PUSH_MODE);
+  }
+
+  /**
+   * Extracts the push attempts from the batchConfigMap, or returns default 
value if not found
+   */
+  public static int getPushAttempts(Map<String, String> batchConfigMap) {
+    String pushAttempts = 
batchConfigMap.get(BatchConfigProperties.PUSH_ATTEMPTS);
+    if (StringUtils.isNumeric(pushAttempts)) {
+      return Integer.parseInt(pushAttempts);
+    }
+    return DEFAULT_PUSH_ATTEMPTS;
+  }
+
+  /**
+   * Extracts the push parallelism from the batchConfigMap, or returns default 
value if not found
+   */
+  public static int getPushParallelism(Map<String, String> batchConfigMap) {
+    String pushParallelism = 
batchConfigMap.get(BatchConfigProperties.PUSH_PARALLELISM);
+    if (StringUtils.isNumeric(pushParallelism)) {
+      return Integer.parseInt(pushParallelism);
+    }
+    return DEFAULT_PUSH_PARALLELISM;
+  }
+
+  /**
+   * Extracts the push return interval millis from the batchConfigMap, or 
returns default value if not found
+   */
+  public static long getPushRetryIntervalMillis(Map<String, String> 
batchConfigMap) {
+    String pushRetryIntervalMillis = 
batchConfigMap.get(BatchConfigProperties.PUSH_RETRY_INTERVAL_MILLIS);
+    if (StringUtils.isNumeric(pushRetryIntervalMillis)) {
+      return Long.parseLong(pushRetryIntervalMillis);
     }
-    return pushMode;
+    return DEFAULT_PUSH_RETRY_INTERVAL_MILLIS;
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to