icefury71 commented on a change in pull request #6740: URL: https://github.com/apache/incubator-pinot/pull/6740#discussion_r609282243
########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java ########## @@ -185,38 +177,107 @@ public static String buildSegment(SegmentGeneratorConfig segmentGeneratorConfig) } /** - * Uploads the segment tar files to the provided controller + * Uploads the segments from the provided segmentTar URIs to the table, using push details from the batchConfig + * @param tableNameWithType name of the table to upload the segment + * @param batchConfig batchConfig with details about push such as controllerURI, pushAttempts, pushParallelism, etc + * @param segmentTarURIs list of URI for the segment tar files + * @param authContext auth details required to upload the Pinot segment to controller */ - public static void uploadSegment(String tableNameWithType, List<File> tarFiles, URI controllerUri, - final String authToken) - throws RetriableOperationException, AttemptsExceededException { - for (File tarFile : tarFiles) { - String fileName = tarFile.getName(); - Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT)); - String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()); - - RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> { - try (InputStream inputStream = new FileInputStream(tarFile)) { - SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT - .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), segmentName, inputStream, - FileUploadDownloadClient.makeAuthHeader(authToken), - FileUploadDownloadClient.makeTableParam(tableNameWithType), - FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); - LOGGER.info("Response for pushing table {} segment {} - {}: {}", tableNameWithType, segmentName, - response.getStatusCode(), response.getResponse()); - return true; - } catch (HttpErrorStatusException e) { - int statusCode = e.getStatusCode(); - if (statusCode >= 500) { - LOGGER.warn("Caught temporary exception while pushing table: {} segment: {}, will retry", tableNameWithType, - segmentName, e); - return false; - } else { - throw e; + public static void uploadSegment(String tableNameWithType, BatchConfig batchConfig, List<URI> segmentTarURIs, + @Nullable AuthContext authContext) + throws Exception { + + SegmentGenerationJobSpec segmentUploadSpec = generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext); + + String pushMode = batchConfig.getPushMode(); + switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { + case TAR: + try { + SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS, + segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList())); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); Review comment: nit: attach a friendly message to the exception maybe? (Error while pushing a segment (tarball) to Pinot Controller) ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java ########## @@ -185,38 +177,107 @@ public static String buildSegment(SegmentGeneratorConfig segmentGeneratorConfig) } /** - * Uploads the segment tar files to the provided controller + * Uploads the segments from the provided segmentTar URIs to the table, using push details from the batchConfig + * @param tableNameWithType name of the table to upload the segment + * @param batchConfig batchConfig with details about push such as controllerURI, pushAttempts, pushParallelism, etc + * @param segmentTarURIs list of URI for the segment tar files + * @param authContext auth details required to upload the Pinot segment to controller */ - public static void uploadSegment(String tableNameWithType, List<File> tarFiles, URI controllerUri, - final String authToken) - throws RetriableOperationException, AttemptsExceededException { - for (File tarFile : tarFiles) { - String fileName = tarFile.getName(); - Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT)); - String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()); - - RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> { - try (InputStream inputStream = new FileInputStream(tarFile)) { - SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT - .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), segmentName, inputStream, - FileUploadDownloadClient.makeAuthHeader(authToken), - FileUploadDownloadClient.makeTableParam(tableNameWithType), - FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); - LOGGER.info("Response for pushing table {} segment {} - {}: {}", tableNameWithType, segmentName, - response.getStatusCode(), response.getResponse()); - return true; - } catch (HttpErrorStatusException e) { - int statusCode = e.getStatusCode(); - if (statusCode >= 500) { - LOGGER.warn("Caught temporary exception while pushing table: {} segment: {}, will retry", tableNameWithType, - segmentName, e); - return false; - } else { - throw e; + public static void uploadSegment(String tableNameWithType, BatchConfig batchConfig, List<URI> segmentTarURIs, + @Nullable AuthContext authContext) + throws Exception { + + SegmentGenerationJobSpec segmentUploadSpec = generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext); + + String pushMode = batchConfig.getPushMode(); + switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { + case TAR: + try { + SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS, + segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList())); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + case URI: + try { + URI outputSegmentDirURI = null; + if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) { + outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); } + List<String> segmentUris = new ArrayList<>(); + for (URI segmentTarURI : segmentTarURIs) { + URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputSegmentDirURI, segmentTarURI, + segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(), + segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); + } + SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + case METADATA: + try { + URI outputSegmentDirURI = null; + if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) { + outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); + } + PinotFS outputFileFS = getOutputPinotFS(batchConfig, outputSegmentDirURI); + Map<String, String> segmentUriToTarPathMap = SegmentPushUtils + .getSegmentUriToTarPathMap(outputSegmentDirURI, segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(), + segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix(), new String[]{segmentTarURIs.toString()}); + SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec, outputFileFS, segmentUriToTarPathMap); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); } - }); + break; + default: + throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode); + } + } + + private static SegmentGenerationJobSpec generateSegmentUploadSpec(String tableName, BatchConfig batchConfig, + @Nullable AuthContext authContext) { + + TableSpec tableSpec = new TableSpec(); + tableSpec.setTableName(tableName); + + PinotClusterSpec pinotClusterSpec = new PinotClusterSpec(); + pinotClusterSpec.setControllerURI(batchConfig.getPushControllerURI()); + PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec}; + + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setPushAttempts(batchConfig.getPushAttempts()); + pushJobSpec.setPushParallelism(batchConfig.getPushParallelism()); + pushJobSpec.setPushRetryIntervalMillis(batchConfig.getPushIntervalRetryMillis()); + pushJobSpec.setSegmentUriPrefix(batchConfig.getPushSegmentURIPrefix()); + pushJobSpec.setSegmentUriSuffix(batchConfig.getPushSegmentURISuffix()); + + SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec(); + spec.setPushJobSpec(pushJobSpec); + spec.setTableSpec(tableSpec); + spec.setPinotClusterSpecs(pinotClusterSpecs); + if (authContext != null && StringUtils.isNotBlank(authContext.getAuthToken())) { + spec.setAuthToken(authContext.getAuthToken()); + } + return spec; + } + + /** + * Creates an instance of the PinotFS using the fileURI and fs properties from BatchConfig + */ + public static PinotFS getOutputPinotFS(BatchConfig batchConfig, URI fileURI) { + String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme(); + if (fileURIScheme == null) { + fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME; + } + if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) { Review comment: nit: given function name is 'getOutputPinotFS' - maybe this part (registration) can be made into a separate method for readability ? ########## File path: pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java ########## @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.segmentuploader; + +import com.google.common.base.Preconditions; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.core.util.IngestionUtils; +import org.apache.pinot.spi.auth.AuthContext; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.ingestion.batch.BatchConfig; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.ingestion.batch.spec.Constants; +import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Default implementation of {@link SegmentUploader} with support for all push modes + * The configs for push are fetched from batchConfigMaps of tableConfig + */ +public class SegmentUploaderDefault implements SegmentUploader { + + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUploaderDefault.class); + + private String _tableNameWithType; + private BatchConfig _batchConfig; + + @Override + public void init(TableConfig tableConfig) + throws Exception { + _tableNameWithType = tableConfig.getTableName(); + + Preconditions.checkState( + tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getBatchIngestionConfig() != null + && CollectionUtils + .isNotEmpty(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps()), + "Must provide ingestionConfig->batchIngestionConfig->batchConfigMaps in tableConfig for table: %s", + _tableNameWithType); + Preconditions + .checkState(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().size() == 1, + "batchConfigMaps must contain only 1 BatchConfig for table: %s", _tableNameWithType); + _batchConfig = new BatchConfig(_tableNameWithType, + tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().get(0)); + + Preconditions.checkState(StringUtils.isNotBlank(_batchConfig.getPushControllerURI()), + "Must provide: %s in batchConfigs for table: %s", BatchConfigProperties.PUSH_CONTROLLER_URI, + _tableNameWithType); + + LOGGER.info("Initialized {} for table: {}", SegmentUploaderDefault.class.getName(), _tableNameWithType); + } + + @Override + public void uploadSegment(URI segmentTarURI, @Nullable AuthContext authContext) + throws Exception { + IngestionUtils + .uploadSegment(_tableNameWithType, _batchConfig, Collections.singletonList(segmentTarURI), authContext); + LOGGER.info("Successfully uploaded segment: {} to table: {}", segmentTarURI, _tableNameWithType); + } + + @Override + public void uploadSegments(URI segmentDir, @Nullable AuthContext authContext) + throws Exception { + + List<URI> segmentTarURIs = new ArrayList<>(); + PinotFS outputPinotFS = IngestionUtils.getOutputPinotFS(_batchConfig, segmentDir); + String[] files = outputPinotFS.listFiles(segmentDir, true); + for (String file : files) { Review comment: nit: fileName : fileNames ? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java ########## @@ -185,38 +177,107 @@ public static String buildSegment(SegmentGeneratorConfig segmentGeneratorConfig) } /** - * Uploads the segment tar files to the provided controller + * Uploads the segments from the provided segmentTar URIs to the table, using push details from the batchConfig + * @param tableNameWithType name of the table to upload the segment + * @param batchConfig batchConfig with details about push such as controllerURI, pushAttempts, pushParallelism, etc + * @param segmentTarURIs list of URI for the segment tar files + * @param authContext auth details required to upload the Pinot segment to controller */ - public static void uploadSegment(String tableNameWithType, List<File> tarFiles, URI controllerUri, - final String authToken) - throws RetriableOperationException, AttemptsExceededException { - for (File tarFile : tarFiles) { - String fileName = tarFile.getName(); - Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT)); - String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()); - - RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> { - try (InputStream inputStream = new FileInputStream(tarFile)) { - SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT - .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), segmentName, inputStream, - FileUploadDownloadClient.makeAuthHeader(authToken), - FileUploadDownloadClient.makeTableParam(tableNameWithType), - FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); - LOGGER.info("Response for pushing table {} segment {} - {}: {}", tableNameWithType, segmentName, - response.getStatusCode(), response.getResponse()); - return true; - } catch (HttpErrorStatusException e) { - int statusCode = e.getStatusCode(); - if (statusCode >= 500) { - LOGGER.warn("Caught temporary exception while pushing table: {} segment: {}, will retry", tableNameWithType, - segmentName, e); - return false; - } else { - throw e; + public static void uploadSegment(String tableNameWithType, BatchConfig batchConfig, List<URI> segmentTarURIs, + @Nullable AuthContext authContext) + throws Exception { + + SegmentGenerationJobSpec segmentUploadSpec = generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext); + + String pushMode = batchConfig.getPushMode(); + switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { + case TAR: + try { + SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS, + segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList())); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + case URI: + try { + URI outputSegmentDirURI = null; + if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) { + outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); } + List<String> segmentUris = new ArrayList<>(); + for (URI segmentTarURI : segmentTarURIs) { + URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputSegmentDirURI, segmentTarURI, + segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(), + segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); + } + SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + case METADATA: + try { + URI outputSegmentDirURI = null; + if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) { + outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); + } + PinotFS outputFileFS = getOutputPinotFS(batchConfig, outputSegmentDirURI); + Map<String, String> segmentUriToTarPathMap = SegmentPushUtils + .getSegmentUriToTarPathMap(outputSegmentDirURI, segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(), + segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix(), new String[]{segmentTarURIs.toString()}); + SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec, outputFileFS, segmentUriToTarPathMap); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); } - }); + break; + default: + throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode); + } + } + + private static SegmentGenerationJobSpec generateSegmentUploadSpec(String tableName, BatchConfig batchConfig, + @Nullable AuthContext authContext) { + + TableSpec tableSpec = new TableSpec(); + tableSpec.setTableName(tableName); + + PinotClusterSpec pinotClusterSpec = new PinotClusterSpec(); + pinotClusterSpec.setControllerURI(batchConfig.getPushControllerURI()); + PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec}; + + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setPushAttempts(batchConfig.getPushAttempts()); + pushJobSpec.setPushParallelism(batchConfig.getPushParallelism()); + pushJobSpec.setPushRetryIntervalMillis(batchConfig.getPushIntervalRetryMillis()); + pushJobSpec.setSegmentUriPrefix(batchConfig.getPushSegmentURIPrefix()); + pushJobSpec.setSegmentUriSuffix(batchConfig.getPushSegmentURISuffix()); + + SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec(); + spec.setPushJobSpec(pushJobSpec); + spec.setTableSpec(tableSpec); + spec.setPinotClusterSpecs(pinotClusterSpecs); + if (authContext != null && StringUtils.isNotBlank(authContext.getAuthToken())) { + spec.setAuthToken(authContext.getAuthToken()); + } + return spec; + } + + /** + * Creates an instance of the PinotFS using the fileURI and fs properties from BatchConfig + */ + public static PinotFS getOutputPinotFS(BatchConfig batchConfig, URI fileURI) { + String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme(); + if (fileURIScheme == null) { + fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME; + } + if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) { Review comment: The alternative is - we could rename the function to getOrRegisterPinotFS something like that ? ########## File path: pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java ########## @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.segmentuploader; + +import com.google.common.base.Preconditions; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.core.util.IngestionUtils; +import org.apache.pinot.spi.auth.AuthContext; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.ingestion.batch.BatchConfig; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.ingestion.batch.spec.Constants; +import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Default implementation of {@link SegmentUploader} with support for all push modes + * The configs for push are fetched from batchConfigMaps of tableConfig + */ +public class SegmentUploaderDefault implements SegmentUploader { + + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUploaderDefault.class); + + private String _tableNameWithType; + private BatchConfig _batchConfig; + + @Override + public void init(TableConfig tableConfig) + throws Exception { + _tableNameWithType = tableConfig.getTableName(); + + Preconditions.checkState( + tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getBatchIngestionConfig() != null + && CollectionUtils + .isNotEmpty(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps()), + "Must provide ingestionConfig->batchIngestionConfig->batchConfigMaps in tableConfig for table: %s", + _tableNameWithType); + Preconditions + .checkState(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().size() == 1, + "batchConfigMaps must contain only 1 BatchConfig for table: %s", _tableNameWithType); + _batchConfig = new BatchConfig(_tableNameWithType, + tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().get(0)); + + Preconditions.checkState(StringUtils.isNotBlank(_batchConfig.getPushControllerURI()), + "Must provide: %s in batchConfigs for table: %s", BatchConfigProperties.PUSH_CONTROLLER_URI, + _tableNameWithType); + + LOGGER.info("Initialized {} for table: {}", SegmentUploaderDefault.class.getName(), _tableNameWithType); + } + + @Override + public void uploadSegment(URI segmentTarURI, @Nullable AuthContext authContext) + throws Exception { + IngestionUtils + .uploadSegment(_tableNameWithType, _batchConfig, Collections.singletonList(segmentTarURI), authContext); + LOGGER.info("Successfully uploaded segment: {} to table: {}", segmentTarURI, _tableNameWithType); + } + + @Override + public void uploadSegments(URI segmentDir, @Nullable AuthContext authContext) Review comment: Since we're passing a single parameter of type URI -> my suggestion is to rename this function to 'uploadSegmentsFromDirectory'. Intuitively, when I read uploadSegments, I expect a list of URIs ########## File path: pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java ########## @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.segmentuploader; + +import com.google.common.base.Preconditions; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.core.util.IngestionUtils; +import org.apache.pinot.spi.auth.AuthContext; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.ingestion.batch.BatchConfig; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.ingestion.batch.spec.Constants; +import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Default implementation of {@link SegmentUploader} with support for all push modes + * The configs for push are fetched from batchConfigMaps of tableConfig + */ +public class SegmentUploaderDefault implements SegmentUploader { + + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUploaderDefault.class); + + private String _tableNameWithType; + private BatchConfig _batchConfig; + + @Override + public void init(TableConfig tableConfig) + throws Exception { + _tableNameWithType = tableConfig.getTableName(); + + Preconditions.checkState( + tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getBatchIngestionConfig() != null + && CollectionUtils + .isNotEmpty(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps()), + "Must provide ingestionConfig->batchIngestionConfig->batchConfigMaps in tableConfig for table: %s", + _tableNameWithType); + Preconditions + .checkState(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().size() == 1, + "batchConfigMaps must contain only 1 BatchConfig for table: %s", _tableNameWithType); + _batchConfig = new BatchConfig(_tableNameWithType, + tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().get(0)); + + Preconditions.checkState(StringUtils.isNotBlank(_batchConfig.getPushControllerURI()), + "Must provide: %s in batchConfigs for table: %s", BatchConfigProperties.PUSH_CONTROLLER_URI, + _tableNameWithType); + + LOGGER.info("Initialized {} for table: {}", SegmentUploaderDefault.class.getName(), _tableNameWithType); + } + + @Override + public void uploadSegment(URI segmentTarURI, @Nullable AuthContext authContext) + throws Exception { + IngestionUtils + .uploadSegment(_tableNameWithType, _batchConfig, Collections.singletonList(segmentTarURI), authContext); + LOGGER.info("Successfully uploaded segment: {} to table: {}", segmentTarURI, _tableNameWithType); + } + + @Override + public void uploadSegments(URI segmentDir, @Nullable AuthContext authContext) + throws Exception { + + List<URI> segmentTarURIs = new ArrayList<>(); + PinotFS outputPinotFS = IngestionUtils.getOutputPinotFS(_batchConfig, segmentDir); + String[] files = outputPinotFS.listFiles(segmentDir, true); Review comment: nit: fileNames ? ########## File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java ########## @@ -158,11 +164,58 @@ public static PinotConfiguration getOutputFsProps(Map<String, String> batchConfi return props; } + /** + * Extracts the segment name generator type from the batchConfigMap, or returns default value if not found + */ + public static String getSegmentNameGeneratorType(Map<String, String> batchConfigMap) { + String segmentNameGeneratorType = batchConfigMap.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE); Review comment: I suppose you could use getOrDefault to make it concise ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org