This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch basic-auth-controller
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 67e04c89d13937e1a594ccd14950fda8807affe7
Author: Alexander Pucher <a...@alexpucher.com>
AuthorDate: Tue Feb 16 16:23:37 2021 -0800

    transitioning more calls for auth support
---
 .../common/utils/FileUploadDownloadClient.java     | 62 +++++-----------------
 .../resources/PinotIngestionRestletResource.java   | 22 ++++++--
 .../pinot/controller/util/FileIngestionHelper.java | 16 +++---
 .../pinot/controller/util/FileIngestionUtils.java  | 16 ++----
 .../apache/pinot/core/auth/BasicAuthPrincipal.java | 18 +++++++
 .../org/apache/pinot/core/auth/BasicAuthUtils.java | 19 ++++++-
 .../ingestion/batch/common/SegmentPushUtils.java   |  5 +-
 .../ingestion/common/DefaultControllerRestApi.java |  8 ++-
 .../apache/pinot/tools/utils/PinotConfigUtils.java |  2 +-
 9 files changed, 89 insertions(+), 79 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 401b036..b4da975 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -447,21 +447,6 @@ public class FileUploadDownloadClient implements Closeable 
{
    * @param uri URI
    * @param schemaName Schema name
    * @param schemaFile Schema file
-   * @return Response
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public SimpleHttpResponse addSchema(URI uri, String schemaName, File 
schemaFile)
-      throws IOException, HttpErrorStatusException {
-    return sendRequest(getAddSchemaRequest(uri, schemaName, schemaFile, null, 
null));
-  }
-
-  /**
-   * Add schema.
-   *
-   * @param uri URI
-   * @param schemaName Schema name
-   * @param schemaFile Schema file
    * @param headers HTTP headers
    * @param parameters HTTP parameters
    * @return Response
@@ -475,21 +460,6 @@ public class FileUploadDownloadClient implements Closeable 
{
   }
 
   /**
-   * Update schema.
-   *
-   * @param uri URI
-   * @param schemaName Schema name
-   * @param schemaFile Schema file
-   * @return Response
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public SimpleHttpResponse updateSchema(URI uri, String schemaName, File 
schemaFile)
-      throws IOException, HttpErrorStatusException {
-    return sendRequest(getUpdateSchemaRequest(uri, schemaName, schemaFile));
-  }
-
-  /**
    * Upload segment by sending a zip of creation.meta and metadata.properties.
    *
    * @param uri URI
@@ -581,25 +551,6 @@ public class FileUploadDownloadClient implements Closeable 
{
   }
 
   /**
-   * Upload segment with segment file input stream using default settings. 
Include table name as a request parameter.
-   *
-   * @param uri URI
-   * @param segmentName Segment name
-   * @param inputStream Segment file input stream
-   * @param rawTableName Raw table name
-   * @return Response
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public SimpleHttpResponse uploadSegment(URI uri, String segmentName, 
InputStream inputStream, String rawTableName)
-      throws IOException, HttpErrorStatusException {
-    // Add table name as a request parameter
-    NameValuePair tableNameValuePair = new 
BasicNameValuePair(QueryParameters.TABLE_NAME, rawTableName);
-    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
-    return uploadSegment(uri, segmentName, inputStream, null, parameters, 
DEFAULT_SOCKET_TIMEOUT_MS);
-  }
-
-  /**
    * Send segment uri.
    *
    * Note: table name has to be set as a parameter.
@@ -783,7 +734,7 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   /**
-   * Generate an (optional) HTTP Authorization header given an auth token
+   * Generate an (optional) HTTP Authorization header given an auth token.
    *
    * @param authToken auth token
    * @return list of 0 or 1 "Authorization" headers
@@ -794,4 +745,15 @@ public class FileUploadDownloadClient implements Closeable 
{
     }
     return Collections.singletonList(new BasicHeader("Authorization", 
authToken));
   }
+
+  /**
+   * Generate a param list with a table name attribute.
+   *
+   * @param tableName table name
+   * @return param list
+   */
+  public static List<NameValuePair> makeTableParam(String tableName) {
+    return Collections
+        .singletonList(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, 
tableName));
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
index f37a251..0695db5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
@@ -25,6 +25,7 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import java.io.File;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Map;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
@@ -36,6 +37,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
@@ -116,8 +118,7 @@ public class PinotIngestionRestletResource {
       + "\n Example usage (query params need encoding):" + "\n```"
       + "\ncurl -X POST -F file=@data.json -H \"Content-Type: 
multipart/form-data\" 
\"http://localhost:9000/ingestFromFile?tableNameWithType=foo_OFFLINE&";
       + "\nbatchConfigMapStr={" + "\n  \"inputFormat\":\"csv\"," + "\n  
\"recordReader.prop.delimiter\":\"|\""
-      + "\n}\" "
-      + "\n```")
+      + "\n}\" " + "\n```")
   public void ingestFromFile(
       @ApiParam(value = "Name of the table to upload the file to", required = 
true) @QueryParam("tableNameWithType") String tableNameWithType,
       @ApiParam(value = "Batch config Map as json string. Must pass 
inputFormat, and optionally record reader properties. e.g. 
{\"inputFormat\":\"json\"}", required = true) @QueryParam("batchConfigMapStr") 
String batchConfigMapStr,
@@ -191,8 +192,21 @@ public class PinotIngestionRestletResource {
     Schema schema = 
_pinotHelixResourceManager.getTableSchema(tableNameWithType);
 
     FileIngestionHelper fileIngestionHelper =
-        new FileIngestionHelper(tableConfig, schema, batchConfig, 
_controllerConf.getControllerHost(),
-            Integer.parseInt(_controllerConf.getControllerPort()), new 
File(_controllerConf.getDataDir(), UPLOAD_DIR));
+        new FileIngestionHelper(tableConfig, schema, batchConfig, 
getControllerUri(),
+            new File(_controllerConf.getDataDir(), UPLOAD_DIR), 
getAuthToken());
     return fileIngestionHelper.buildSegmentAndPush(payload);
   }
+
+  private String getAuthToken() {
+    return _controllerConf
+        
.getProperty(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY
 + ".auth.token");
+  }
+
+  private URI getControllerUri() {
+    try {
+      return new URI(_controllerConf.generateVipUrl());
+    } catch (URISyntaxException e) {
+      throw new IllegalStateException("Controller VIP uri is invalid", e);
+    }
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 49521fe..d82f6a7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -50,18 +50,18 @@ public class FileIngestionHelper {
   private final TableConfig _tableConfig;
   private final Schema _schema;
   private final BatchConfig _batchConfig;
-  private final String _controllerHost;
-  private final int _controllerPort;
+  private final URI _controllerUri;
   private final File _uploadDir;
+  private final String _authToken;
 
-  public FileIngestionHelper(TableConfig tableConfig, Schema schema, 
BatchConfig batchConfig, String controllerHost,
-      int controllerPort, File uploadDir) {
+  public FileIngestionHelper(TableConfig tableConfig, Schema schema, 
BatchConfig batchConfig, URI controllerUri,
+      File uploadDir, String authToken) {
     _tableConfig = tableConfig;
     _schema = schema;
     _batchConfig = batchConfig;
-    _controllerHost = controllerHost;
-    _controllerPort = controllerPort;
+    _controllerUri = controllerUri;
     _uploadDir = uploadDir;
+    _authToken = authToken;
   }
 
   /**
@@ -106,8 +106,8 @@ public class FileIngestionHelper {
           new File(segmentTarDir, segmentName + 
org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT);
       TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName), 
segmentTarFile);
       FileIngestionUtils
-          .uploadSegment(tableNameWithType, 
Lists.newArrayList(segmentTarFile), _controllerHost, _controllerPort);
-      LOGGER.info("Uploaded tar: {} to {}:{}", 
segmentTarFile.getAbsolutePath(), _controllerHost, _controllerPort);
+          .uploadSegment(tableNameWithType, 
Lists.newArrayList(segmentTarFile), _controllerUri, _authToken);
+      LOGGER.info("Uploaded tar: {} to {}", segmentTarFile.getAbsolutePath(), 
_controllerUri);
 
       return new SuccessResponse(
           "Successfully ingested file into table: " + tableNameWithType + " as 
segment: " + segmentName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java
index 7346eb4..716c3bc 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionUtils.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pinot.controller.util;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.FileInputStream;
@@ -28,8 +26,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.IOUtils;
@@ -42,13 +38,10 @@ import 
org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
-import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.BatchConfig;
-import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
-import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetriableOperationException;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -141,8 +134,7 @@ public final class FileIngestionUtils {
   /**
    * Uploads the segment tar files to the provided controller
    */
-  public static void uploadSegment(String tableNameWithType, List<File> 
tarFiles, String controllerHost,
-      int controllerPort)
+  public static void uploadSegment(String tableNameWithType, List<File> 
tarFiles, URI controllerUri, String authToken)
       throws RetriableOperationException, AttemptsExceededException {
     for (File tarFile : tarFiles) {
       String fileName = tarFile.getName();
@@ -154,8 +146,10 @@ public final class FileIngestionUtils {
       RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, 
DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> {
         try (InputStream inputStream = new FileInputStream(tarFile)) {
           SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-              
.uploadSegment(FileUploadDownloadClient.getUploadSegmentHttpURI(controllerHost, 
controllerPort),
-                  segmentName, inputStream, tableNameWithType);
+              
.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), 
segmentName, inputStream,
+                  FileUploadDownloadClient.makeAuthHeader(authToken),
+                  FileUploadDownloadClient.makeTableParam(tableNameWithType),
+                  FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
           LOGGER.info("Response for pushing table {} segment {} - {}: {}", 
tableNameWithType, segmentName,
               response.getStatusCode(), response.getResponse());
           return true;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
index ae44401..8dd735e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.core.auth;
 
 import java.util.Set;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
index 46cfa6b..6a3cc39 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java
@@ -1,10 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.core.auth;
 
 import com.google.common.base.Preconditions;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index d15cfcd..5cc69ba 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -26,7 +26,6 @@ import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -120,8 +119,8 @@ public class SegmentPushUtils implements Serializable {
           try (InputStream inputStream = fileSystem.open(tarFileURI)) {
             SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
                 
.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), 
segmentName, inputStream,
-                    
FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()), 
Collections.singletonList(
-                        new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, 
tableName)),
+                    
FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()),
+                    FileUploadDownloadClient.makeTableParam(tableName),
                     FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response for pushing table {} segment {} to location 
{} - {}: {}", tableName, segmentName,
                 controllerURI, response.getStatusCode(), 
response.getResponse());
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
index 2880482..391d33b 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,6 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * Deprecated. Does not support HTTPS or authentication
+ */
 public class DefaultControllerRestApi implements ControllerRestApi {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultControllerRestApi.class);
 
@@ -114,7 +118,9 @@ public class DefaultControllerRestApi implements 
ControllerRestApi {
           try (InputStream inputStream = fileSystem.open(tarFilePath)) {
             SimpleHttpResponse response = 
_fileUploadDownloadClient.uploadSegment(
                 
FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), 
pushLocation.getPort()),
-                segmentName, inputStream, _rawTableName);
+                segmentName, inputStream, Collections.emptyList(),
+                FileUploadDownloadClient.makeTableParam(_rawTableName),
+                FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response {}: {}", response.getStatusCode(), 
response.getResponse());
             break;
           } catch (Exception e) {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 7a17845..17338af 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -76,7 +76,7 @@ public class PinotConfigUtils {
     properties.put("controller.admin.access.control.principals.user.password", 
"secret");
     properties.put("controller.admin.access.control.principals.user.tables", 
"baseballStats");
     
properties.put("controller.admin.access.control.principals.user.permissions", 
"read");
-    properties.put("pinot.controller.segment.fetcher.auth.token", "Basic 
YWRtaW46dmVyeXNlY3JldA==");
+    properties.put("controller.segment.fetcher.auth.token", "Basic 
YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to