This is an automated email from the ASF dual-hosted git repository. apucher pushed a commit to branch basic-auth-controller in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 67e04c89d13937e1a594ccd14950fda8807affe7 Author: Alexander Pucher <a...@alexpucher.com> AuthorDate: Tue Feb 16 16:23:37 2021 -0800 transitioning more calls for auth support --- .../common/utils/FileUploadDownloadClient.java | 62 +++++----------------- .../resources/PinotIngestionRestletResource.java | 22 ++++++-- .../pinot/controller/util/FileIngestionHelper.java | 16 +++--- .../pinot/controller/util/FileIngestionUtils.java | 16 ++---- .../apache/pinot/core/auth/BasicAuthPrincipal.java | 18 +++++++ .../org/apache/pinot/core/auth/BasicAuthUtils.java | 19 ++++++- .../ingestion/batch/common/SegmentPushUtils.java | 5 +- .../ingestion/common/DefaultControllerRestApi.java | 8 ++- .../apache/pinot/tools/utils/PinotConfigUtils.java | 2 +- 9 files changed, 89 insertions(+), 79 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 401b036..b4da975 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -447,21 +447,6 @@ public class FileUploadDownloadClient implements Closeable { * @param uri URI * @param schemaName Schema name * @param schemaFile Schema file - * @return Response - * @throws IOException - * @throws HttpErrorStatusException - */ - public SimpleHttpResponse addSchema(URI uri, String schemaName, File schemaFile) - throws IOException, HttpErrorStatusException { - return sendRequest(getAddSchemaRequest(uri, schemaName, schemaFile, null, null)); - } - - /** - * Add schema. - * - * @param uri URI - * @param schemaName Schema name - * @param schemaFile Schema file * @param headers HTTP headers * @param parameters HTTP parameters * @return Response @@ -475,21 +460,6 @@ public class FileUploadDownloadClient implements Closeable { } /** - * Update schema. - * - * @param uri URI - * @param schemaName Schema name - * @param schemaFile Schema file - * @return Response - * @throws IOException - * @throws HttpErrorStatusException - */ - public SimpleHttpResponse updateSchema(URI uri, String schemaName, File schemaFile) - throws IOException, HttpErrorStatusException { - return sendRequest(getUpdateSchemaRequest(uri, schemaName, schemaFile)); - } - - /** * Upload segment by sending a zip of creation.meta and metadata.properties. * * @param uri URI @@ -581,25 +551,6 @@ public class FileUploadDownloadClient implements Closeable { } /** - * Upload segment with segment file input stream using default settings. Include table name as a request parameter. - * - * @param uri URI - * @param segmentName Segment name - * @param inputStream Segment file input stream - * @param rawTableName Raw table name - * @return Response - * @throws IOException - * @throws HttpErrorStatusException - */ - public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream inputStream, String rawTableName) - throws IOException, HttpErrorStatusException { - // Add table name as a request parameter - NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, rawTableName); - List<NameValuePair> parameters = Arrays.asList(tableNameValuePair); - return uploadSegment(uri, segmentName, inputStream, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS); - } - - /** * Send segment uri. * * Note: table name has to be set as a parameter. @@ -783,7 +734,7 @@ public class FileUploadDownloadClient implements Closeable { } /** - * Generate an (optional) HTTP Authorization header given an auth token + * Generate an (optional) HTTP Authorization header given an auth token. * * @param authToken auth token * @return list of 0 or 1 "Authorization" headers @@ -794,4 +745,15 @@ public class FileUploadDownloadClient implements Closeable { } return Collections.singletonList(new BasicHeader("Authorization", authToken)); } + + /** + * Generate a param list with a table name attribute. + * + * @param tableName table name + * @return param list + */ + public static List<NameValuePair> makeTableParam(String tableName) { + return Collections + .singletonList(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName)); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java index f37a251..0695db5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java @@ -25,6 +25,7 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import java.io.File; import java.net.URI; +import java.net.URISyntaxException; import java.util.Map; import javax.inject.Inject; import javax.ws.rs.Consumes; @@ -36,6 +37,7 @@ import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; @@ -116,8 +118,7 @@ public class PinotIngestionRestletResource { + "\n Example usage (query params need encoding):" + "\n```" + "\ncurl -X POST -F file=@data.json -H \"Content-Type: multipart/form-data\" \"http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&" + "\nbatchConfigMapStr={" + "\n \"inputFormat\":\"csv\"," + "\n \"recordReader.prop.delimiter\":\"|\"" - + "\n}\" " - + "\n```") + + "\n}\" " + "\n```") public void ingestFromFile( @ApiParam(value = "Name of the table to upload the file to", required = true) @QueryParam("tableNameWithType") String tableNameWithType, @ApiParam(value = "Batch config Map as json string. Must pass inputFormat, and optionally record reader properties. e.g. {\"inputFormat\":\"json\"}", required = true) @QueryParam("batchConfigMapStr") String batchConfigMapStr, @@ -191,8 +192,21 @@ public class PinotIngestionRestletResource { Schema schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType); FileIngestionHelper fileIngestionHelper = - new FileIngestionHelper(tableConfig, schema, batchConfig, _controllerConf.getControllerHost(), - Integer.parseInt(_controllerConf.getControllerPort()), new File(_controllerConf.getDataDir(), UPLOAD_DIR)); + new FileIngestionHelper(tableConfig, schema, batchConfig, getControllerUri(), + new File(_controllerConf.getDataDir(), UPLOAD_DIR), getAuthToken()); return fileIngestionHelper.buildSegmentAndPush(payload); } + + private String getAuthToken() { + return _controllerConf + .getProperty(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY + ".auth.token"); + } + + private URI getControllerUri() { + try { + return new URI(_controllerConf.generateVipUrl()); + } catch (URISyntaxException e) { + throw new IllegalStateException("Controller VIP uri is invalid", e); + } + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java index 49521fe..d82f6a7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java @@ -50,18 +50,18 @@ public class FileIngestionHelper { private final TableConfig _tableConfig; private final Schema _schema; private final BatchConfig _batchConfig; - private final String _controllerHost; - private final int _controllerPort; + private final URI _controllerUri; private final File _uploadDir; + private final String _authToken; - public FileIngestionHelper(TableConfig tableConfig, Schema schema, BatchConfig batchConfig, String controllerHost, - int controllerPort, File uploadDir) { + public FileIngestionHelper(TableConfig tableConfig, Schema schema, BatchConfig batchConfig, URI controllerUri, + File uploadDir, String authToken) { _tableConfig = tableConfig; _schema = schema; _batchConfig = batchConfig; - _controllerHost = controllerHost; - _controllerPort = controllerPort; + _controllerUri = controllerUri; _uploadDir = uploadDir; + _authToken = authToken; } /** @@ -106,8 +106,8 @@ public class FileIngestionHelper { new File(segmentTarDir, segmentName + org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT); TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName), segmentTarFile); FileIngestionUtils - .uploadSegment(tableNameWithType, Lists.newArrayList(segmentTarFile), _controllerHost, _controllerPort); - LOGGER.info("Uploaded tar: {} to {}:{}", segmentTarFile.getAbsolutePath(), _controllerHost, _controllerPort); + .uploadSegment(tableNameWithType, Lists.newArrayList(segmentTarFile), _controllerUri, _authToken); + LOGGER.info("Uploaded tar: {} to {}", segmentTarFile.getAbsolutePath(), _controllerUri); return new SuccessResponse( "Successfully ingested file into table: " + tableNameWithType + " as segment: " + segmentName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java index 7346eb4..716c3bc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java @@ -18,8 +18,6 @@ */ package org.apache.pinot.controller.util; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import java.io.File; import java.io.FileInputStream; @@ -28,8 +26,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; @@ -42,13 +38,10 @@ import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.FileFormat; -import org.apache.pinot.spi.data.readers.RecordReaderConfig; import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.ingestion.batch.BatchConfig; -import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.utils.IngestionConfigUtils; -import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.apache.pinot.spi.utils.retry.RetriableOperationException; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -141,8 +134,7 @@ public final class FileIngestionUtils { /** * Uploads the segment tar files to the provided controller */ - public static void uploadSegment(String tableNameWithType, List<File> tarFiles, String controllerHost, - int controllerPort) + public static void uploadSegment(String tableNameWithType, List<File> tarFiles, URI controllerUri, String authToken) throws RetriableOperationException, AttemptsExceededException { for (File tarFile : tarFiles) { String fileName = tarFile.getName(); @@ -154,8 +146,10 @@ public final class FileIngestionUtils { RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> { try (InputStream inputStream = new FileInputStream(tarFile)) { SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT - .uploadSegment(FileUploadDownloadClient.getUploadSegmentHttpURI(controllerHost, controllerPort), - segmentName, inputStream, tableNameWithType); + .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), segmentName, inputStream, + FileUploadDownloadClient.makeAuthHeader(authToken), + FileUploadDownloadClient.makeTableParam(tableNameWithType), + FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); LOGGER.info("Response for pushing table {} segment {} - {}: {}", tableNameWithType, segmentName, response.getStatusCode(), response.getResponse()); return true; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java index ae44401..8dd735e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pinot.core.auth; import java.util.Set; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java index 46cfa6b..6a3cc39 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java @@ -1,10 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pinot.core.auth; import com.google.common.base.Preconditions; import java.util.Arrays; import java.util.Base64; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java index d15cfcd..5cc69ba 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java @@ -26,7 +26,6 @@ import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -120,8 +119,8 @@ public class SegmentPushUtils implements Serializable { try (InputStream inputStream = fileSystem.open(tarFileURI)) { SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, inputStream, - FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()), Collections.singletonList( - new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName)), + FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()), + FileUploadDownloadClient.makeTableParam(tableName), FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName, controllerURI, response.getStatusCode(), response.getResponse()); diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java index 2880482..391d33b 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,6 +38,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Deprecated. Does not support HTTPS or authentication + */ public class DefaultControllerRestApi implements ControllerRestApi { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultControllerRestApi.class); @@ -114,7 +118,9 @@ public class DefaultControllerRestApi implements ControllerRestApi { try (InputStream inputStream = fileSystem.open(tarFilePath)) { SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment( FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()), - segmentName, inputStream, _rawTableName); + segmentName, inputStream, Collections.emptyList(), + FileUploadDownloadClient.makeTableParam(_rawTableName), + FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse()); break; } catch (Exception e) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java index 7a17845..17338af 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java @@ -76,7 +76,7 @@ public class PinotConfigUtils { properties.put("controller.admin.access.control.principals.user.password", "secret"); properties.put("controller.admin.access.control.principals.user.tables", "baseballStats"); properties.put("controller.admin.access.control.principals.user.permissions", "read"); - properties.put("pinot.controller.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA=="); + properties.put("controller.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA=="); return properties; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org