This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 409e5da SegmentUploader impl (#6740) 409e5da is described below commit 409e5da344c8ab4077fd318f6a3f801377b1b958 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Thu Apr 8 17:56:44 2021 -0700 SegmentUploader impl (#6740) A default implementation for the SegmentUploader. This is a followup to #6718. With this PR, the SegmentWriter and SegmentUploader will be ready for use by other initiatives. --- pinot-controller/pom.xml | 5 + .../pinot/controller/util/FileIngestionHelper.java | 39 ++++- .../api/PinotIngestionRestletResourceTest.java | 14 +- .../pinot/controller/helix/ControllerTest.java | 5 + .../org/apache/pinot/core/util/IngestionUtils.java | 165 +++++++++++++++------ .../apache/pinot/core/util}/SegmentPushUtils.java | 2 +- .../pinot/core/util}/SegmentPushUtilsTest.java | 2 +- pinot-distribution/pinot-assembly.xml | 6 + pinot-integration-tests/pom.xml | 5 + .../SegmentWriterUploaderIntegrationTest.java | 88 ++++++++--- .../hadoop/HadoopSegmentMetadataPushJobRunner.java | 2 +- .../hadoop/HadoopSegmentTarPushJobRunner.java | 2 +- .../hadoop/HadoopSegmentUriPushJobRunner.java | 2 +- .../spark/SparkSegmentMetadataPushJobRunner.java | 2 +- .../batch/spark/SparkSegmentTarPushJobRunner.java | 2 +- .../batch/spark/SparkSegmentUriPushJobRunner.java | 2 +- .../standalone/SegmentMetadataPushJobRunner.java | 2 +- .../batch/standalone/SegmentTarPushJobRunner.java | 2 +- .../batch/standalone/SegmentUriPushJobRunner.java | 2 +- .../SegmentGenerationAndPushTaskExecutor.java | 3 +- .../pinot-segment-uploader-default/pom.xml | 53 +++++++ .../segmentuploader/SegmentUploaderDefault.java | 100 +++++++++++++ pinot-plugins/pinot-segment-uploader/pom.xml | 67 +++++++++ .../filebased/FileBasedSegmentWriter.java | 2 +- pinot-plugins/pom.xml | 1 + .../org/apache/pinot/spi/auth/AuthContext.java | 34 +++++ .../pinot/spi/ingestion/batch/BatchConfig.java | 52 ++++++- .../spi/ingestion/batch/BatchConfigProperties.java | 3 + .../segment/uploader/SegmentUploader.java | 11 +- .../pinot/spi/utils/IngestionConfigUtils.java | 54 ++++++- 30 files changed, 624 insertions(+), 105 deletions(-) diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml index 52ede83..6d25fd6 100644 --- a/pinot-controller/pom.xml +++ b/pinot-controller/pom.xml @@ -73,6 +73,11 @@ </dependency> <dependency> <groupId>org.apache.pinot</groupId> + <artifactId>pinot-segment-uploader-default</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> <artifactId>pinot-yammer</artifactId> <scope>test</scope> </dependency> diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java index 169c41a..73615f4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java @@ -30,17 +30,23 @@ import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.controller.api.resources.SuccessResponse; import org.apache.pinot.core.util.IngestionUtils; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.auth.AuthContext; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.ingestion.batch.BatchConfig; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader; +import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.slf4j.Logger; @@ -53,6 +59,7 @@ import org.slf4j.LoggerFactory; */ public class FileIngestionHelper { private static final Logger LOGGER = LoggerFactory.getLogger(FileIngestionHelper.class); + private static final String SEGMENT_UPLOADER_CLASS = "org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault"; private static final String WORKING_DIR_PREFIX = "working_dir"; private static final String INPUT_DATA_DIR = "input_data_dir"; @@ -65,7 +72,7 @@ public class FileIngestionHelper { private final BatchConfig _batchConfig; private final URI _controllerUri; private final File _uploadDir; - private final String _authToken; + private final AuthContext _authContext; public FileIngestionHelper(TableConfig tableConfig, Schema schema, BatchConfig batchConfig, URI controllerUri, File uploadDir, String authToken) { @@ -74,7 +81,7 @@ public class FileIngestionHelper { _batchConfig = batchConfig; _controllerUri = controllerUri; _uploadDir = uploadDir; - _authToken = authToken; + _authContext = new AuthContext(authToken); } /** @@ -108,14 +115,25 @@ public class FileIngestionHelper { LOGGER.info("Copied multipart payload to local file: {}", inputDir.getAbsolutePath()); } - // Get SegmentGeneratorConfig + // Update batch config map with values for file upload Map<String, String> batchConfigMapOverride = new HashMap<>(_batchConfig.getBatchConfigMap()); batchConfigMapOverride.put(BatchConfigProperties.INPUT_DIR_URI, inputFile.getAbsolutePath()); batchConfigMapOverride.put(BatchConfigProperties.OUTPUT_DIR_URI, outputDir.getAbsolutePath()); + batchConfigMapOverride.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _controllerUri.toString()); + String segmentNamePostfixProp = String.format("%s.%s", BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX, + BatchConfigProperties.SEGMENT_NAME_POSTFIX); + if (StringUtils.isBlank(batchConfigMapOverride.get(segmentNamePostfixProp))) { + // Default segmentNameGenerator is SIMPLE. + // Adding this suffix to prevent creating a segment with the same name as an existing segment, + // if a file with the same time range is received again + batchConfigMapOverride.put(segmentNamePostfixProp, String.valueOf(System.currentTimeMillis())); + } BatchIngestionConfig batchIngestionConfigOverride = new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride), IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig), IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig)); + + // Get SegmentGeneratorConfig SegmentGeneratorConfig segmentGeneratorConfig = IngestionUtils.generateSegmentGeneratorConfig(_tableConfig, _schema, batchIngestionConfigOverride); @@ -123,13 +141,20 @@ public class FileIngestionHelper { String segmentName = IngestionUtils.buildSegment(segmentGeneratorConfig); LOGGER.info("Built segment: {}", segmentName); - // Tar and push segment + // Tar segment dir File segmentTarFile = new File(segmentTarDir, segmentName + org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT); TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName), segmentTarFile); - IngestionUtils - .uploadSegment(tableNameWithType, Lists.newArrayList(segmentTarFile), _controllerUri, _authToken); - LOGGER.info("Uploaded tar: {} to {}", segmentTarFile.getAbsolutePath(), _controllerUri); + + // Upload segment + IngestionConfig ingestionConfigOverride = new IngestionConfig(batchIngestionConfigOverride, null, null, null); + TableConfig tableConfigOverride = + new TableConfigBuilder(_tableConfig.getTableType()).setTableName(_tableConfig.getTableName()) + .setIngestionConfig(ingestionConfigOverride).build(); + SegmentUploader segmentUploader = PluginManager.get().createInstance(SEGMENT_UPLOADER_CLASS); + segmentUploader.init(tableConfigOverride); + segmentUploader.uploadSegment(segmentTarFile.toURI(), _authContext); + LOGGER.info("Uploaded tar: {} to table: {}", segmentTarFile.getAbsolutePath(), tableNameWithType); return new SuccessResponse( "Successfully ingested file into table: " + tableNameWithType + " as segment: " + segmentName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java index a688a00..044347c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java @@ -22,11 +22,6 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.net.HttpURLConnection; -import java.net.URL; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,19 +30,16 @@ import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.entity.mime.MultipartEntity; import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.entity.mime.content.FileBody; -import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -98,8 +90,8 @@ public class PinotIngestionRestletResourceTest extends ControllerTest { // ingest from file Map<String, String> batchConfigMap = new HashMap<>(); - batchConfigMap.put("inputFormat", "csv"); - batchConfigMap.put("recordReader.prop.delimiter", "|"); + batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "csv"); + batchConfigMap.put(String.format("%s.delimiter", BatchConfigProperties.RECORD_READER_PROP_PREFIX), "|"); sendHttpPost(_controllerRequestURLBuilder.forIngestFromFile(TABLE_NAME_WITH_TYPE, batchConfigMap)); segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE); Assert.assertEquals(segments.size(), 1); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 376b75b..478b784 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -531,6 +531,11 @@ public abstract class ControllerTest { _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName))); } + protected void dropAllSegments(String tableName, TableType tableType) throws IOException { + sendDeleteRequest( + _controllerRequestURLBuilder.forSegmentDeleteAllAPI(tableName, tableType.toString())); + } + protected void reloadOfflineTable(String tableName) throws IOException { sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE.name()), null); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java index baaac9c..5328cf9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java @@ -19,21 +19,17 @@ package org.apache.pinot.core.util; import com.google.common.base.Preconditions; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; import java.net.URI; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.pinot.common.exception.HttpErrorStatusException; -import org.apache.pinot.common.utils.FileUploadDownloadClient; -import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.core.data.function.FunctionEvaluator; import org.apache.pinot.core.data.function.FunctionEvaluatorFactory; import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; @@ -42,6 +38,7 @@ import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator; +import org.apache.pinot.spi.auth.AuthContext; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; @@ -53,16 +50,20 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReaderFactory; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.LocalPinotFS; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.ingestion.batch.BatchConfig; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; -import org.apache.pinot.spi.ingestion.batch.spec.Constants; +import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec; +import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.ingestion.batch.spec.TableSpec; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.apache.pinot.spi.utils.retry.RetriableOperationException; -import org.apache.pinot.spi.utils.retry.RetryPolicies; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** @@ -70,13 +71,7 @@ import org.slf4j.LoggerFactory; */ public final class IngestionUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(IngestionUtils.class); - - private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE = - BatchConfigProperties.SegmentNameGeneratorType.SIMPLE; - private static final long DEFAULT_RETRY_WAIT_MS = 1000L; - private static final int DEFAULT_ATTEMPTS = 3; - private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS(); private IngestionUtils() { } @@ -143,9 +138,6 @@ public final class IngestionUtils { String rawTableName = TableNameBuilder.extractRawTableName(batchConfig.getTableNameWithType()); String segmentNameGeneratorType = batchConfig.getSegmentNameGeneratorType(); - if (segmentNameGeneratorType == null) { - segmentNameGeneratorType = DEFAULT_SEGMENT_NAME_GENERATOR_TYPE; - } switch (segmentNameGeneratorType) { case BatchConfigProperties.SegmentNameGeneratorType.FIXED: return new FixedSegmentNameGenerator(batchConfig.getSegmentName()); @@ -185,40 +177,117 @@ public final class IngestionUtils { } /** - * Uploads the segment tar files to the provided controller + * Uploads the segments from the provided segmentTar URIs to the table, using push details from the batchConfig + * @param tableNameWithType name of the table to upload the segment + * @param batchConfig batchConfig with details about push such as controllerURI, pushAttempts, pushParallelism, etc + * @param segmentTarURIs list of URI for the segment tar files + * @param authContext auth details required to upload the Pinot segment to controller */ - public static void uploadSegment(String tableNameWithType, List<File> tarFiles, URI controllerUri, - final String authToken) - throws RetriableOperationException, AttemptsExceededException { - for (File tarFile : tarFiles) { - String fileName = tarFile.getName(); - Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT)); - String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()); - - RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> { - try (InputStream inputStream = new FileInputStream(tarFile)) { - SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT - .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), segmentName, inputStream, - FileUploadDownloadClient.makeAuthHeader(authToken), - FileUploadDownloadClient.makeTableParam(tableNameWithType), - FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); - LOGGER.info("Response for pushing table {} segment {} - {}: {}", tableNameWithType, segmentName, - response.getStatusCode(), response.getResponse()); - return true; - } catch (HttpErrorStatusException e) { - int statusCode = e.getStatusCode(); - if (statusCode >= 500) { - LOGGER.warn("Caught temporary exception while pushing table: {} segment: {}, will retry", tableNameWithType, - segmentName, e); - return false; - } else { - throw e; + public static void uploadSegment(String tableNameWithType, BatchConfig batchConfig, List<URI> segmentTarURIs, + @Nullable AuthContext authContext) + throws Exception { + + SegmentGenerationJobSpec segmentUploadSpec = generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext); + + List<String> segmentTarURIStrs = segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList()); + String pushMode = batchConfig.getPushMode(); + switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { + case TAR: + try { + SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS, segmentTarURIStrs); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(String + .format("Caught exception while uploading segments. Push mode: TAR, segment tars: [%s]", + segmentTarURIStrs), e); + } + break; + case URI: + List<String> segmentUris = new ArrayList<>(); + try { + URI outputSegmentDirURI = null; + if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) { + outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); + } + for (URI segmentTarURI : segmentTarURIs) { + URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputSegmentDirURI, segmentTarURI, + segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(), + segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); } + SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(String + .format("Caught exception while uploading segments. Push mode: URI, segment URIs: [%s]", segmentUris), e); } - }); + break; + case METADATA: + try { + URI outputSegmentDirURI = null; + if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) { + outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); + } + PinotFS outputFileFS = getOutputPinotFS(batchConfig, outputSegmentDirURI); + Map<String, String> segmentUriToTarPathMap = SegmentPushUtils + .getSegmentUriToTarPathMap(outputSegmentDirURI, segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(), + segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix(), new String[]{segmentTarURIs.toString()}); + SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec, outputFileFS, segmentUriToTarPathMap); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(String + .format("Caught exception while uploading segments. Push mode: METADATA, segment URIs: [%s]", + segmentTarURIStrs), e); + } + break; + default: + throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode); } } + private static SegmentGenerationJobSpec generateSegmentUploadSpec(String tableName, BatchConfig batchConfig, + @Nullable AuthContext authContext) { + + TableSpec tableSpec = new TableSpec(); + tableSpec.setTableName(tableName); + + PinotClusterSpec pinotClusterSpec = new PinotClusterSpec(); + pinotClusterSpec.setControllerURI(batchConfig.getPushControllerURI()); + PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec}; + + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setPushAttempts(batchConfig.getPushAttempts()); + pushJobSpec.setPushParallelism(batchConfig.getPushParallelism()); + pushJobSpec.setPushRetryIntervalMillis(batchConfig.getPushIntervalRetryMillis()); + pushJobSpec.setSegmentUriPrefix(batchConfig.getPushSegmentURIPrefix()); + pushJobSpec.setSegmentUriSuffix(batchConfig.getPushSegmentURISuffix()); + + SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec(); + spec.setPushJobSpec(pushJobSpec); + spec.setTableSpec(tableSpec); + spec.setPinotClusterSpecs(pinotClusterSpecs); + if (authContext != null && StringUtils.isNotBlank(authContext.getAuthToken())) { + spec.setAuthToken(authContext.getAuthToken()); + } + return spec; + } + + /** + * Creates an instance of the PinotFS using the fileURI and fs properties from BatchConfig + */ + public static PinotFS getOutputPinotFS(BatchConfig batchConfig, URI fileURI) { + String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme(); + if (fileURIScheme == null) { + fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME; + } + if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) { + registerPinotFS(fileURIScheme, batchConfig.getOutputFsClassName(), + IngestionConfigUtils.getOutputFsProps(batchConfig.getBatchConfigMap())); + } + return PinotFSFactory.create(fileURIScheme); + } + + private static void registerPinotFS(String fileURIScheme, String fsClass, PinotConfiguration fsProps) { + PinotFSFactory.register(fileURIScheme, fsClass, fsProps); + } + /** * Extracts all fields required by the {@link org.apache.pinot.spi.data.readers.RecordExtractor} from the given TableConfig and Schema * Fields for ingestion come from 2 places: diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentPushUtils.java similarity index 99% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java rename to pinot-core/src/main/java/org/apache/pinot/core/util/SegmentPushUtils.java index 820f09f..4c254a6 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentPushUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.ingestion.batch.common; +package org.apache.pinot.core.util; import com.google.common.base.Preconditions; import java.io.File; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/SegmentPushUtilsTest.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java rename to pinot-core/src/test/java/org/apache/pinot/core/util/SegmentPushUtilsTest.java index ceecc58..19408f4 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SegmentPushUtilsTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pinot.plugin.ingestion.batch.common; +package org.apache.pinot.core.util; import java.net.URI; import org.testng.Assert; diff --git a/pinot-distribution/pinot-assembly.xml b/pinot-distribution/pinot-assembly.xml index 3e1ed79..e117b48 100644 --- a/pinot-distribution/pinot-assembly.xml +++ b/pinot-distribution/pinot-assembly.xml @@ -140,6 +140,12 @@ <destName>plugins/pinot-segment-writer/pinot-segment-writer-file-based/pinot-segment-writer-file-based-${project.version}-shaded.jar</destName> </file> <!-- End Include Pinot Segment Writer Plugins--> + <!-- Start Include Pinot Segment Uploader Plugins--> + <file> + <source>${pinot.root}/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/target/pinot-segment-uploader-default-${project.version}-shaded.jar</source> + <destName>plugins/pinot-segment-uploader/pinot-segment-uploader-default/pinot-segment-uploader-default-${project.version}-shaded.jar</destName> + </file> + <!-- End Include Pinot Segment Uploader Plugins--> <!-- End Include Pinot Plugins--> </files> <fileSets> diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml index 8949e8b..c17bd5f 100644 --- a/pinot-integration-tests/pom.xml +++ b/pinot-integration-tests/pom.xml @@ -220,6 +220,11 @@ </dependency> <dependency> <groupId>org.apache.pinot</groupId> + <artifactId>pinot-segment-uploader-default</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> <artifactId>pinot-yammer</artifactId> <version>${project.version}</version> </dependency> diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java index 883b0a6..1811f72 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java @@ -23,12 +23,14 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader; +import org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault; import org.apache.pinot.plugin.segmentwriter.filebased.FileBasedSegmentWriter; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -37,6 +39,7 @@ import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader; import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -85,6 +88,7 @@ public class SegmentWriterUploaderIntegrationTest extends BaseClusterIntegration Map<String, String> batchConfigMap = new HashMap<>(); batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, _tarDir.getAbsolutePath()); batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false"); + batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _controllerBaseApiUrl); return new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), null, null, null); } @@ -95,7 +99,7 @@ public class SegmentWriterUploaderIntegrationTest extends BaseClusterIntegration * Checks the number of segments created and total docs from the query */ @Test - public void testFileBasedSegmentWriter() + public void testFileBasedSegmentWriterAndDefaultUploader() throws Exception { TableConfig offlineTableConfig = createOfflineTableConfig(); @@ -103,6 +107,8 @@ public class SegmentWriterUploaderIntegrationTest extends BaseClusterIntegration SegmentWriter segmentWriter = new FileBasedSegmentWriter(); segmentWriter.init(offlineTableConfig, _schema); + SegmentUploader segmentUploader = new SegmentUploaderDefault(); + segmentUploader.init(offlineTableConfig); GenericRow reuse = new GenericRow(); long totalDocs = 0; @@ -110,34 +116,36 @@ public class SegmentWriterUploaderIntegrationTest extends BaseClusterIntegration AvroRecordReader avroRecordReader = new AvroRecordReader(); avroRecordReader.init(_avroFiles.get(i), null, null); + long numDocsInSegment = 0; while (avroRecordReader.hasNext()) { avroRecordReader.next(reuse); segmentWriter.collect(reuse); + numDocsInSegment++; totalDocs++; } - segmentWriter.flush(); + // flush to segment + URI segmentTarURI = segmentWriter.flush(); + // upload + segmentUploader.uploadSegment(segmentTarURI, null); + + // check num segments + Assert.assertEquals(getNumSegments(), i + 1); + // check numDocs in latest segment + Assert.assertEquals(getNumDocsInLatestSegment(), numDocsInSegment); + // check totalDocs in query + checkTotalDocsInQuery(totalDocs); } segmentWriter.close(); - // Manually upload - // TODO: once an implementation of SegmentUploader is available, use that instead - uploadSegments(_tableNameWithType, _tarDir); + dropAllSegments(_tableNameWithType, TableType.OFFLINE); + checkNumSegments(0); + // upload all together using dir + segmentUploader.uploadSegmentsFromDir(_tarDir.toURI(), null); // check num segments Assert.assertEquals(getNumSegments(), 3); - final long expectedDocs = totalDocs; - TestUtils.waitForCondition(new Function<Void, Boolean>() { - @Nullable - @Override - public Boolean apply(@Nullable Void aVoid) { - try { - return getTotalDocsFromQuery() == expectedDocs; - } catch (Exception e) { - LOGGER.error("Caught exception when getting totalDocs from query: {}", e.getMessage()); - return null; - } - } - }, 100L, 120_000, "Failed to load " + expectedDocs + " documents", true); + // check totalDocs in query + checkTotalDocsInQuery(totalDocs); dropOfflineTable(_tableNameWithType); } @@ -156,6 +164,50 @@ public class SegmentWriterUploaderIntegrationTest extends BaseClusterIntegration return response.get("resultTable").get("rows").get(0).get(0).asInt(); } + private int getNumDocsInLatestSegment() + throws IOException { + String jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder. + forSegmentListAPIWithTableType(_tableNameWithType, TableType.OFFLINE.toString())); + JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr); + JsonNode segments = array.get(0).get("OFFLINE"); + String segmentName = segments.get(segments.size() - 1).asText(); + + jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder. + forSegmentMetadata(_tableNameWithType, segmentName)); + JsonNode metadata = JsonUtils.stringToJsonNode(jsonOutputStr); + return metadata.get("segment.total.docs").asInt(); + } + + private void checkTotalDocsInQuery(long expectedTotalDocs) { + TestUtils.waitForCondition(new Function<Void, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable Void aVoid) { + try { + return getTotalDocsFromQuery() == expectedTotalDocs; + } catch (Exception e) { + LOGGER.error("Caught exception when getting totalDocs from query: {}", e.getMessage()); + return null; + } + } + }, 100L, 120_000, "Failed to load " + expectedTotalDocs + " documents", true); + } + + private void checkNumSegments(int expectedNumSegments) { + TestUtils.waitForCondition(new Function<Void, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable Void aVoid) { + try { + return getNumSegments() == expectedNumSegments; + } catch (Exception e) { + LOGGER.error("Caught exception when getting num segments: {}", e.getMessage()); + return null; + } + } + }, 100L, 120_000, "Failed to load get num segments", true); + } + @AfterClass public void tearDown() throws Exception { diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java index 65ae7c4..0d9d5ac 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java @@ -26,7 +26,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.core.util.SegmentPushUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java index 05aa8f9..d29a3f4 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java @@ -25,7 +25,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.core.util.SegmentPushUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java index f451c2c..b563d26 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java @@ -26,7 +26,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.core.util.SegmentPushUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java index 5b07db4..bb2ca79 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java @@ -26,7 +26,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.core.util.SegmentPushUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java index 1757652..b398bee 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java @@ -27,7 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.core.util.SegmentPushUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java index 824e05a..2926a73 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java @@ -27,7 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.core.util.SegmentPushUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java index cad8cf6..f2b2e8d 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java @@ -24,7 +24,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.List; import java.util.Map; -import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.core.util.SegmentPushUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java index aa1eee4..641fbdf 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java @@ -25,7 +25,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.core.util.SegmentPushUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java index 3172dac..d9a0db7 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java @@ -25,7 +25,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.core.util.SegmentPushUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java index a1d9a38..313a219 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java @@ -30,9 +30,9 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.core.util.SegmentPushUtils; import org.apache.pinot.minion.MinionContext; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; -import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -118,6 +118,7 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor { resultBuilder.setSegmentName(segmentName); // Segment push task + // TODO: Make this use SegmentUploader pushSegment(taskSpec.getTableConfig().getTableName(), taskConfigs, outputSegmentTarURI); resultBuilder.setSucceed(true); } catch (Exception e) { diff --git a/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/pom.xml b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/pom.xml new file mode 100644 index 0000000..7870427 --- /dev/null +++ b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/pom.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>pinot-segment-uploader</artifactId> + <groupId>org.apache.pinot</groupId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>pinot-segment-uploader-default</artifactId> + <name>Pinot Segment Uploader Default</name> + <url>https://pinot.apache.org/</url> + <properties> + <pinot.root>${basedir}/../../..</pinot.root> + <phase.prop>package</phase.prop> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-core</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java new file mode 100644 index 0000000..5c71575 --- /dev/null +++ b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.segmentuploader; + +import com.google.common.base.Preconditions; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.core.util.IngestionUtils; +import org.apache.pinot.spi.auth.AuthContext; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.ingestion.batch.BatchConfig; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.ingestion.batch.spec.Constants; +import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Default implementation of {@link SegmentUploader} with support for all push modes + * The configs for push are fetched from batchConfigMaps of tableConfig + */ +public class SegmentUploaderDefault implements SegmentUploader { + + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUploaderDefault.class); + + private String _tableNameWithType; + private BatchConfig _batchConfig; + + @Override + public void init(TableConfig tableConfig) + throws Exception { + _tableNameWithType = tableConfig.getTableName(); + + Preconditions.checkState( + tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getBatchIngestionConfig() != null + && CollectionUtils + .isNotEmpty(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps()), + "Must provide ingestionConfig->batchIngestionConfig->batchConfigMaps in tableConfig for table: %s", + _tableNameWithType); + Preconditions + .checkState(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().size() == 1, + "batchConfigMaps must contain only 1 BatchConfig for table: %s", _tableNameWithType); + _batchConfig = new BatchConfig(_tableNameWithType, + tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().get(0)); + + Preconditions.checkState(StringUtils.isNotBlank(_batchConfig.getPushControllerURI()), + "Must provide: %s in batchConfigs for table: %s", BatchConfigProperties.PUSH_CONTROLLER_URI, + _tableNameWithType); + + LOGGER.info("Initialized {} for table: {}", SegmentUploaderDefault.class.getName(), _tableNameWithType); + } + + @Override + public void uploadSegment(URI segmentTarURI, @Nullable AuthContext authContext) + throws Exception { + IngestionUtils + .uploadSegment(_tableNameWithType, _batchConfig, Collections.singletonList(segmentTarURI), authContext); + LOGGER.info("Successfully uploaded segment: {} to table: {}", segmentTarURI, _tableNameWithType); + } + + @Override + public void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthContext authContext) + throws Exception { + + List<URI> segmentTarURIs = new ArrayList<>(); + PinotFS outputPinotFS = IngestionUtils.getOutputPinotFS(_batchConfig, segmentDir); + String[] filePaths = outputPinotFS.listFiles(segmentDir, true); + for (String filePath : filePaths) { + URI uri = URI.create(filePath); + if (!outputPinotFS.isDirectory(uri) && filePath.endsWith(Constants.TAR_GZ_FILE_EXT)) { + segmentTarURIs.add(uri); + } + } + IngestionUtils.uploadSegment(_tableNameWithType, _batchConfig, segmentTarURIs, authContext); + LOGGER.info("Successfully uploaded segments: {} to table: {}", segmentTarURIs, _tableNameWithType); + } +} diff --git a/pinot-plugins/pinot-segment-uploader/pom.xml b/pinot-plugins/pinot-segment-uploader/pom.xml new file mode 100644 index 0000000..e5e05fb --- /dev/null +++ b/pinot-plugins/pinot-segment-uploader/pom.xml @@ -0,0 +1,67 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>pinot-plugins</artifactId> + <groupId>org.apache.pinot</groupId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <artifactId>pinot-segment-uploader</artifactId> + <packaging>pom</packaging> + <name>Pinot Segment Uploader</name> + <url>https://pinot.apache.org/</url> + <properties> + <pinot.root>${basedir}/../..</pinot.root> + <plugin.type>pinot-segment-uploader</plugin.type> + </properties> + + <modules> + <module>pinot-segment-uploader-default</module> + </modules> + + <dependencies> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-spi</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java index abae7c2..e1c090c 100644 --- a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java +++ b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java @@ -145,7 +145,7 @@ public class FileBasedSegmentWriter implements SegmentWriter { * Successful completion of segment will return the segment URI. * The buffer will be reset and ready to accept further records via <code>collect()</code> * - * If an exception is throw, the buffer will not be reset + * If an exception is thrown, the buffer will not be reset * and so, <code>flush()</code> can be invoked repeatedly in a retry loop. * If a successful invocation is not achieved,<code>close()</code> followed by <code>init</code> will have to be * called in order to reset the buffer and resume record writing. diff --git a/pinot-plugins/pom.xml b/pinot-plugins/pom.xml index bd5efbc..238e02b 100644 --- a/pinot-plugins/pom.xml +++ b/pinot-plugins/pom.xml @@ -47,6 +47,7 @@ <module>pinot-minion-tasks</module> <module>pinot-metrics</module> <module>pinot-segment-writer</module> + <module>pinot-segment-uploader</module> </modules> <dependencies> diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java new file mode 100644 index 0000000..5a9798c --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.auth; + +/** + * Container for all auth related info + */ +public class AuthContext { + private final String _authToken; + + public AuthContext(String authToken) { + _authToken = authToken; + } + + public String getAuthToken() { + return _authToken; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java index aed5eec..4495c85 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java @@ -52,6 +52,15 @@ public class BatchConfig { private final boolean _excludeSequenceId; private final String _sequenceId; + private final String _pushMode; + private final int _pushAttempts; + private final int _pushParallelism; + private final long _pushIntervalRetryMillis; + private final String _pushSegmentURIPrefix; + private final String _pushSegmentURISuffix; + private final String _pushControllerURI; + private final String _outputSegmentDirURI; + public BatchConfig(String tableNameWithType, Map<String, String> batchConfigsMap) { _batchConfigMap = batchConfigsMap; _tableNameWithType = tableNameWithType; @@ -78,7 +87,7 @@ public class BatchConfig { _recordReaderProps = IngestionConfigUtils .extractPropsMatchingPrefix(batchConfigsMap, BatchConfigProperties.RECORD_READER_PROP_PREFIX); - _segmentNameGeneratorType = batchConfigsMap.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE); + _segmentNameGeneratorType = IngestionConfigUtils.getSegmentNameGeneratorType(batchConfigsMap); _segmentNameGeneratorConfigs = IngestionConfigUtils .extractPropsMatchingPrefix(batchConfigsMap, BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX); Map<String, String> segmentNameGeneratorProps = IngestionConfigUtils.getSegmentNameGeneratorProps(batchConfigsMap); @@ -87,6 +96,15 @@ public class BatchConfig { _segmentNamePostfix = segmentNameGeneratorProps.get(BatchConfigProperties.SEGMENT_NAME_POSTFIX); _excludeSequenceId = Boolean.parseBoolean(segmentNameGeneratorProps.get(BatchConfigProperties.EXCLUDE_SEQUENCE_ID)); _sequenceId = batchConfigsMap.get(BatchConfigProperties.SEQUENCE_ID); + + _pushMode = IngestionConfigUtils.getPushMode(batchConfigsMap); + _pushAttempts = IngestionConfigUtils.getPushAttempts(batchConfigsMap); + _pushParallelism = IngestionConfigUtils.getPushParallelism(batchConfigsMap); + _pushIntervalRetryMillis = IngestionConfigUtils.getPushRetryIntervalMillis(batchConfigsMap); + _pushSegmentURIPrefix = batchConfigsMap.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX); + _pushSegmentURISuffix = batchConfigsMap.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX); + _pushControllerURI = batchConfigsMap.get(BatchConfigProperties.PUSH_CONTROLLER_URI); + _outputSegmentDirURI = batchConfigsMap.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI); } public String getTableNameWithType() { @@ -165,6 +183,38 @@ public class BatchConfig { return _sequenceId; } + public String getPushMode() { + return _pushMode; + } + + public int getPushAttempts() { + return _pushAttempts; + } + + public int getPushParallelism() { + return _pushParallelism; + } + + public long getPushIntervalRetryMillis() { + return _pushIntervalRetryMillis; + } + + public String getPushSegmentURIPrefix() { + return _pushSegmentURIPrefix; + } + + public String getPushSegmentURISuffix() { + return _pushSegmentURISuffix; + } + + public String getPushControllerURI() { + return _pushControllerURI; + } + + public String getOutputSegmentDirURI() { + return _outputSegmentDirURI; + } + public Map<String, String> getBatchConfigMap() { return _batchConfigMap; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java index b208814..a82553f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java @@ -50,6 +50,9 @@ public class BatchConfigProperties { public static final String OVERWRITE_OUTPUT = "overwriteOutput"; public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri"; public static final String PUSH_MODE = "push.mode"; + public static final String PUSH_ATTEMPTS = "push.attempts"; + public static final String PUSH_PARALLELISM = "push.parallelism"; + public static final String PUSH_RETRY_INTERVAL_MILLIS = "push.retry.interval.millis"; public static final String PUSH_CONTROLLER_URI = "push.controllerUri"; public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix"; public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix"; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java index 64d6cfa..995b597 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java @@ -19,7 +19,9 @@ package org.apache.pinot.spi.ingestion.segment.uploader; import java.net.URI; +import javax.annotation.Nullable; import org.apache.pinot.spi.annotations.InterfaceStability; +import org.apache.pinot.spi.auth.AuthContext; import org.apache.pinot.spi.config.table.TableConfig; @@ -39,14 +41,17 @@ public interface SegmentUploader { /** * Uploads the segment tar file to the cluster * @param segmentTarFile URI of segment tar file + * @param authContext auth details required to upload pinot segment to controller */ - void uploadSegment(URI segmentTarFile) + void uploadSegment(URI segmentTarFile, @Nullable AuthContext authContext) throws Exception; /** - * Uploads the segments from the segmentDir to the cluster + * Uploads the segments from the segmentDir to the cluster. + * Looks for segmentTar files recursively, with suffix .tar.gz * @param segmentDir URI of directory containing segment tar files + * @param authContext auth details required to upload pinot segment to controller */ - void uploadSegments(URI segmentDir) + void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthContext authContext) throws Exception; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index a1d7530..74f68a9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; @@ -34,7 +35,12 @@ import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; */ public final class IngestionConfigUtils { public static final String DOT_SEPARATOR = "."; + private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE = + BatchConfigProperties.SegmentNameGeneratorType.SIMPLE; private static final String DEFAULT_PUSH_MODE = "tar"; + private static final int DEFAULT_PUSH_ATTEMPTS = 5; + private static final int DEFAULT_PUSH_PARALLELISM = 1; + private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L; /** * Fetches the streamConfig from the given realtime table. @@ -158,11 +164,51 @@ public final class IngestionConfigUtils { return props; } + /** + * Extracts the segment name generator type from the batchConfigMap, or returns default value if not found + */ + public static String getSegmentNameGeneratorType(Map<String, String> batchConfigMap) { + return batchConfigMap + .getOrDefault(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, DEFAULT_SEGMENT_NAME_GENERATOR_TYPE); + } + + /** + * Extracts the push mode from the batchConfigMap, or returns default value if not found + */ public static String getPushMode(Map<String, String> batchConfigMap) { - String pushMode = batchConfigMap.get(BatchConfigProperties.PUSH_MODE); - if (pushMode == null) { - pushMode = DEFAULT_PUSH_MODE; + return batchConfigMap.getOrDefault(BatchConfigProperties.PUSH_MODE, DEFAULT_PUSH_MODE); + } + + /** + * Extracts the push attempts from the batchConfigMap, or returns default value if not found + */ + public static int getPushAttempts(Map<String, String> batchConfigMap) { + String pushAttempts = batchConfigMap.get(BatchConfigProperties.PUSH_ATTEMPTS); + if (StringUtils.isNumeric(pushAttempts)) { + return Integer.parseInt(pushAttempts); + } + return DEFAULT_PUSH_ATTEMPTS; + } + + /** + * Extracts the push parallelism from the batchConfigMap, or returns default value if not found + */ + public static int getPushParallelism(Map<String, String> batchConfigMap) { + String pushParallelism = batchConfigMap.get(BatchConfigProperties.PUSH_PARALLELISM); + if (StringUtils.isNumeric(pushParallelism)) { + return Integer.parseInt(pushParallelism); + } + return DEFAULT_PUSH_PARALLELISM; + } + + /** + * Extracts the push return interval millis from the batchConfigMap, or returns default value if not found + */ + public static long getPushRetryIntervalMillis(Map<String, String> batchConfigMap) { + String pushRetryIntervalMillis = batchConfigMap.get(BatchConfigProperties.PUSH_RETRY_INTERVAL_MILLIS); + if (StringUtils.isNumeric(pushRetryIntervalMillis)) { + return Long.parseLong(pushRetryIntervalMillis); } - return pushMode; + return DEFAULT_PUSH_RETRY_INTERVAL_MILLIS; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org