This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 40a3152  Add segment encryption on Controller based on table config 
(#5617)
40a3152 is described below

commit 40a31529213efa7a5d03eb4e0ca020aa4045e0ca
Author: Sajjad <moradi.saj...@gmail.com>
AuthorDate: Wed Jul 1 13:23:24 2020 -0700

    Add segment encryption on Controller based on table config (#5617)
    
    * Add segment encryption on Controller based on table config
    
    * Applied PR's comments
    
    * Addressed PR's comments
    
    * Add license header + equality check on crypters
    
    * Addressed PR's comments
    
    * Fix log message
    
    Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
    
    Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
---
 .../common/utils/config/TableConfigUtils.java      |  10 +-
 .../pinot/common/utils/helix/TableCache.java       |   4 +
 .../common/utils/config/TableConfigSerDeTest.java  |   6 +-
 .../PinotSegmentUploadDownloadRestletResource.java | 122 ++++++++++++--------
 .../pinot/controller/api/upload/ZKOperator.java    |  11 +-
 .../helix/core/PinotHelixResourceManager.java      |  12 ++
 ...otSegmentUploadDownloadRestletResourceTest.java | 125 +++++++++++++++++++++
 .../controller/api/upload/ZKOperatorTest.java      |  11 +-
 .../SegmentsValidationAndRetentionConfig.java      |  13 ++-
 .../apache/pinot/spi/crypt/NoOpPinotCrypter.java   |  16 ++-
 .../spi/utils/builder/TableConfigBuilder.java      |   7 ++
 11 files changed, 270 insertions(+), 67 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index 45610fa..2ee7b47 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -132,8 +132,8 @@ public class TableConfigUtils {
     }
 
     return new TableConfig(tableName, tableType, validationConfig, 
tenantConfig, indexingConfig, customConfig,
-        quotaConfig, taskConfig, routingConfig, queryConfig, 
instanceAssignmentConfigMap, fieldConfigList,
-        upsertConfig, ingestionConfig);
+        quotaConfig, taskConfig, routingConfig, queryConfig, 
instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
+        ingestionConfig);
   }
 
   public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -228,8 +228,10 @@ public class TableConfigUtils {
       }
       String peerSegmentDownloadScheme = 
validationConfig.getPeerSegmentDownloadScheme();
       if (peerSegmentDownloadScheme != null) {
-        if (!"http".equalsIgnoreCase(peerSegmentDownloadScheme) && 
!"https".equalsIgnoreCase(peerSegmentDownloadScheme)) {
-          throw new IllegalStateException("Invalid value '" + 
peerSegmentDownloadScheme + "' for peerSegmentDownloadScheme. Must be one of 
http nor https" );
+        if (!"http".equalsIgnoreCase(peerSegmentDownloadScheme) && !"https"
+            .equalsIgnoreCase(peerSegmentDownloadScheme)) {
+          throw new IllegalStateException("Invalid value '" + 
peerSegmentDownloadScheme
+              + "' for peerSegmentDownloadScheme. Must be one of http nor 
https");
         }
       }
     }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 474adbf..6f46ac2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -80,6 +80,10 @@ public class TableCache {
     return columnName;
   }
 
+  public TableConfig getTableConfig(String tableName) {
+    return _tableConfigChangeListener._tableConfigMap.get(tableName);
+  }
+
   class TableConfigChangeListener implements IZkChildListener, IZkDataListener 
{
 
     Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index af5e5c0..f877c3f 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -248,8 +248,10 @@ public class TableConfigSerDeTest {
     {
       // with SegmentsValidationAndRetentionConfig
       TableConfig tableConfig = 
tableConfigBuilder.setPeerSegmentDownloadScheme("http").build();
-      
checkSegmentsValidationAndRetentionConfig(JsonUtils.stringToObject(tableConfig.toJsonString(),
 TableConfig.class));
-      
checkSegmentsValidationAndRetentionConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
+      checkSegmentsValidationAndRetentionConfig(
+          JsonUtils.stringToObject(tableConfig.toJsonString(), 
TableConfig.class));
+      checkSegmentsValidationAndRetentionConfig(
+          
TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
     }
     {
       // With ingestion config
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 576390d..1c9500e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.api.resources;
 
+import com.google.common.base.Strings;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -54,6 +55,8 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -71,7 +74,6 @@ import 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.metadata.DefaultMetadataExtractor;
 import org.apache.pinot.core.metadata.MetadataExtractorFactory;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
-import org.apache.pinot.spi.crypt.NoOpPinotCrypter;
 import org.apache.pinot.spi.crypt.PinotCrypter;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -177,13 +179,13 @@ public class PinotSegmentUploadDownloadRestletResource {
   private SuccessResponse uploadSegment(@Nullable String tableName, 
FormDataMultiPart multiPart,
       boolean enableParallelPushProtection, HttpHeaders headers, Request 
request, boolean moveSegmentToFinalLocation) {
     String uploadTypeStr = null;
-    String crypterClassName = null;
+    String crypterClassNameInHeader = null;
     String downloadUri = null;
     if (headers != null) {
       extractHttpHeader(headers, 
CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER);
       extractHttpHeader(headers, 
CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);
       uploadTypeStr = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
-      crypterClassName = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.CRYPTER);
+      crypterClassNameInHeader = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.CRYPTER);
       downloadUri = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
     }
 
@@ -194,38 +196,31 @@ public class PinotSegmentUploadDownloadRestletResource {
       ControllerFilePathProvider provider = 
ControllerFilePathProvider.getInstance();
       String tempFileName = TMP_DIR_PREFIX + System.nanoTime();
       tempDecryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName);
+      tempEncryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName + ENCRYPTED_SUFFIX);
       tempSegmentDir = new File(provider.getUntarredFileTempDir(), 
tempFileName);
 
-      // Set default crypter to the noop crypter when no crypter header is sent
-      // In this case, the noop crypter will not do any operations, so the 
encrypted and decrypted file will have the same
-      // file path.
-      if (crypterClassName == null) {
-        crypterClassName = NoOpPinotCrypter.class.getSimpleName();
-        tempEncryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName);
-      } else {
-        tempEncryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName + ENCRYPTED_SUFFIX);
-      }
-
-      // TODO: Change when metadata upload added
-      String metadataProviderClass = DefaultMetadataExtractor.class.getName();
+      boolean uploadedSegmentIsEncrypted = 
!Strings.isNullOrEmpty(crypterClassNameInHeader);
 
-      SegmentMetadata segmentMetadata;
+      File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : 
tempDecryptedFile;
       FileUploadDownloadClient.FileUploadType uploadType = 
getUploadType(uploadTypeStr);
       switch (uploadType) {
         case URI:
-          segmentMetadata =
-              getMetadataForURI(crypterClassName, downloadUri, 
tempEncryptedFile, tempDecryptedFile, tempSegmentDir,
-                  metadataProviderClass);
+          downloadSegmentFileFromURI(downloadUri, dstFile, tableName);
           break;
         case SEGMENT:
-          getFileFromMultipart(multiPart, tempEncryptedFile);
-          segmentMetadata = getSegmentMetadata(crypterClassName, 
tempEncryptedFile, tempDecryptedFile, tempSegmentDir,
-              metadataProviderClass);
+          createSegmentFileFromMultipart(multiPart, dstFile);
           break;
         default:
           throw new UnsupportedOperationException("Unsupported upload type: " 
+ uploadType);
       }
 
+      if (uploadedSegmentIsEncrypted) {
+        decryptFile(crypterClassNameInHeader, tempEncryptedFile, 
tempDecryptedFile);
+      }
+
+      String metadataProviderClass = DefaultMetadataExtractor.class.getName();
+      SegmentMetadata segmentMetadata = getSegmentMetadata(tempDecryptedFile, 
tempSegmentDir, metadataProviderClass);
+
       // Fetch segment name
       String segmentName = segmentMetadata.getName();
 
@@ -251,6 +246,17 @@ public class PinotSegmentUploadDownloadRestletResource {
           _controllerMetrics, 
_leadControllerManager.isLeaderForTable(offlineTableName))
           .validateOfflineSegment(offlineTableName, segmentMetadata, 
tempSegmentDir);
 
+      // Encrypt segment
+      String crypterClassNameInTableConfig =
+          
_pinotHelixResourceManager.getCrypterClassNameFromTableConfig(offlineTableName);
+      Pair<String, File> encryptionInfo =
+          encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, 
uploadedSegmentIsEncrypted,
+              crypterClassNameInHeader, crypterClassNameInTableConfig, 
segmentName, tableName);
+
+      String crypterClassName = encryptionInfo.getLeft();
+      File finalSegmentFile = encryptionInfo.getRight();
+
+      // ZK download URI
       String zkDownloadUri;
       // This boolean is here for V1 segment upload, where we keep the segment 
in the downloadURI sent in the header.
       // We will deprecate this behavior eventually.
@@ -264,8 +270,8 @@ public class PinotSegmentUploadDownloadRestletResource {
       }
 
       // Zk operations
-      completeZkOperations(enableParallelPushProtection, headers, 
tempEncryptedFile, rawTableName, segmentMetadata,
-          segmentName, zkDownloadUri, moveSegmentToFinalLocation);
+      completeZkOperations(enableParallelPushProtection, headers, 
finalSegmentFile, rawTableName, segmentMetadata,
+          segmentName, zkDownloadUri, moveSegmentToFinalLocation, 
crypterClassName);
 
       return new SuccessResponse("Successfully uploaded segment: " + 
segmentName + " of table: " + rawTableName);
     } catch (WebApplicationException e) {
@@ -290,6 +296,39 @@ public class PinotSegmentUploadDownloadRestletResource {
     return value;
   }
 
+  Pair<String, File> encryptSegmentIfNeeded(File tempDecryptedFile, File 
tempEncryptedFile,
+      boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment, 
String crypterClassNameInTableConfig,
+      String segmentName, String tableName) {
+
+    boolean segmentNeedsEncryption = 
!Strings.isNullOrEmpty(crypterClassNameInTableConfig);
+
+    // form the output
+    File finalSegmentFile =
+        (isUploadedSegmentEncrypted || segmentNeedsEncryption) ? 
tempEncryptedFile : tempDecryptedFile;
+    String crypterClassName = 
Strings.isNullOrEmpty(crypterClassNameInTableConfig) ? 
crypterUsedInUploadedSegment
+        : crypterClassNameInTableConfig;
+    ImmutablePair<String, File> out = ImmutablePair.of(crypterClassName, 
finalSegmentFile);
+
+    if (!segmentNeedsEncryption) {
+      return out;
+    }
+
+    if (isUploadedSegmentEncrypted && 
!crypterClassNameInTableConfig.equals(crypterUsedInUploadedSegment)) {
+      throw new ControllerApplicationException(LOGGER, String.format(
+          "Uploaded segment is encrypted with '%s' while table config requires 
'%s' as crypter "
+              + "(segment name = '%s', table name = '%s').", 
crypterUsedInUploadedSegment,
+          crypterClassNameInTableConfig, segmentName, tableName), 
Response.Status.INTERNAL_SERVER_ERROR);
+    }
+
+    // encrypt segment
+    PinotCrypter pinotCrypter = 
PinotCrypterFactory.create(crypterClassNameInTableConfig);
+    LOGGER.info("Using crypter class '{}' for encrypting '{}' to '{}' (segment 
name = '{}', table name = '{}').",
+        crypterClassNameInTableConfig, tempDecryptedFile, tempEncryptedFile, 
segmentName, tableName);
+    pinotCrypter.encrypt(tempDecryptedFile, tempEncryptedFile);
+
+    return out;
+  }
+
   private String getZkDownloadURIForSegmentUpload(String rawTableName, String 
segmentName) {
     ControllerFilePathProvider provider = 
ControllerFilePathProvider.getInstance();
     URI dataDirURI = provider.getDataDirURI();
@@ -303,46 +342,39 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
   }
 
-  private SegmentMetadata getMetadataForURI(String crypterClassHeader, String 
currentSegmentLocationURI,
-      File tempEncryptedFile, File tempDecryptedFile, File tempSegmentDir, 
String metadataProviderClass)
+  private void downloadSegmentFileFromURI(String currentSegmentLocationURI, 
File destFile, String tableName)
       throws Exception {
-    SegmentMetadata segmentMetadata;
     if (currentSegmentLocationURI == null || 
currentSegmentLocationURI.isEmpty()) {
       throw new ControllerApplicationException(LOGGER, "Failed to get 
downloadURI, needed for URI upload",
           Response.Status.BAD_REQUEST);
     }
-    LOGGER.info("Downloading segment from {} to {}", 
currentSegmentLocationURI, tempEncryptedFile.getAbsolutePath());
-    SegmentFetcherFactory.fetchSegmentToLocal(currentSegmentLocationURI, 
tempEncryptedFile);
-    segmentMetadata = getSegmentMetadata(crypterClassHeader, 
tempEncryptedFile, tempDecryptedFile, tempSegmentDir,
-        metadataProviderClass);
-    return segmentMetadata;
+    LOGGER.info("Downloading segment from {} to {} for table {}", 
currentSegmentLocationURI, destFile.getAbsolutePath(),
+        tableName);
+    SegmentFetcherFactory.fetchSegmentToLocal(currentSegmentLocationURI, 
destFile);
   }
 
-  private SegmentMetadata getSegmentMetadata(String crypterClassHeader, File 
tempEncryptedFile, File tempDecryptedFile,
-      File tempSegmentDir, String metadataProviderClass)
+  private SegmentMetadata getSegmentMetadata(File tempDecryptedFile, File 
tempSegmentDir, String metadataProviderClass)
       throws Exception {
-
-    decryptFile(crypterClassHeader, tempEncryptedFile, tempDecryptedFile);
-
     // Call metadata provider to extract metadata with file object uri
     return 
MetadataExtractorFactory.create(metadataProviderClass).extractMetadata(tempDecryptedFile,
 tempSegmentDir);
   }
 
-  private void completeZkOperations(boolean enableParallelPushProtection, 
HttpHeaders headers, File tempEncryptedFile,
+  private void completeZkOperations(boolean enableParallelPushProtection, 
HttpHeaders headers, File uploadedSegmentFile,
       String rawTableName, SegmentMetadata segmentMetadata, String 
segmentName, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation)
+      boolean moveSegmentToFinalLocation, String crypter)
       throws Exception {
     URI finalSegmentLocationURI = URIUtils
         
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), 
rawTableName,
             URIUtils.encode(segmentName));
     ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, 
_controllerConf, _controllerMetrics);
-    zkOperator.completeSegmentOperations(rawTableName, segmentMetadata, 
finalSegmentLocationURI, tempEncryptedFile,
-        enableParallelPushProtection, headers, zkDownloadURI, 
moveSegmentToFinalLocation);
+    zkOperator.completeSegmentOperations(rawTableName, segmentMetadata, 
finalSegmentLocationURI, uploadedSegmentFile,
+        enableParallelPushProtection, headers, zkDownloadURI, 
moveSegmentToFinalLocation, crypter);
   }
 
-  private void decryptFile(String crypterClassHeader, File tempEncryptedFile, 
File tempDecryptedFile) {
-    PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassHeader);
-    LOGGER.info("Using crypter class {}", pinotCrypter.getClass().getName());
+  private void decryptFile(String crypterClassName, File tempEncryptedFile, 
File tempDecryptedFile) {
+    PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassName);
+    LOGGER.info("Using crypter class {} for decrypting {} to {}", 
pinotCrypter.getClass().getName(), tempEncryptedFile,
+        tempDecryptedFile);
     pinotCrypter.decrypt(tempEncryptedFile, tempDecryptedFile);
   }
 
@@ -422,7 +454,7 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
   }
 
-  private File getFileFromMultipart(FormDataMultiPart multiPart, File dstFile)
+  private File createSegmentFileFromMultipart(FormDataMultiPart multiPart, 
File dstFile)
       throws IOException {
     // Read segment file or segment metadata file and directly use that 
information to update zk
     Map<String, List<FormDataBodyPart>> segmentMetadataMap = 
multiPart.getFields();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index b33161b..fc31c1c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -59,7 +59,7 @@ public class ZKOperator {
 
   public void completeSegmentOperations(String rawTableName, SegmentMetadata 
segmentMetadata,
       URI finalSegmentLocationURI, File currentSegmentLocation, boolean 
enableParallelPushProtection,
-      HttpHeaders headers, String zkDownloadURI, boolean 
moveSegmentToFinalLocation)
+      HttpHeaders headers, String zkDownloadURI, boolean 
moveSegmentToFinalLocation, String crypter)
       throws Exception {
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     String segmentName = segmentMetadata.getName();
@@ -69,7 +69,6 @@ public class ZKOperator {
         
_pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, 
segmentName);
     if (segmentMetadataZnRecord == null) {
       LOGGER.info("Adding new segment {} from table {}", segmentName, 
rawTableName);
-      String crypter = 
headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation, zkDownloadURI, crypter,
           rawTableName, segmentName, moveSegmentToFinalLocation);
       return;
@@ -78,13 +77,14 @@ public class ZKOperator {
     LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, rawTableName);
 
     processExistingSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation,
-        enableParallelPushProtection, headers, zkDownloadURI, 
offlineTableName, segmentName, segmentMetadataZnRecord,
-        moveSegmentToFinalLocation);
+        enableParallelPushProtection, headers, zkDownloadURI, crypter, 
offlineTableName, segmentName,
+        segmentMetadataZnRecord, moveSegmentToFinalLocation);
   }
 
   private void processExistingSegment(SegmentMetadata segmentMetadata, URI 
finalSegmentLocationURI,
       File currentSegmentLocation, boolean enableParallelPushProtection, 
HttpHeaders headers, String zkDownloadURI,
-      String offlineTableName, String segmentName, ZNRecord znRecord, boolean 
moveSegmentToFinalLocation)
+      String crypter, String offlineTableName, String segmentName, ZNRecord 
znRecord,
+      boolean moveSegmentToFinalLocation)
       throws Exception {
 
     OfflineSegmentZKMetadata existingSegmentZKMetadata = new 
OfflineSegmentZKMetadata(znRecord);
@@ -170,7 +170,6 @@ public class ZKOperator {
               zkDownloadURI);
         }
 
-        String crypter = 
headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER);
         _pinotHelixResourceManager
             .refreshSegment(offlineTableName, segmentMetadata, 
existingSegmentZKMetadata, zkDownloadURI, crypter);
       }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a49c5e8..7abe59a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -458,6 +458,18 @@ public class PinotHelixResourceManager {
   public String getActualColumnName(String tableName, String columnName) {
     return _tableCache.getActualColumnName(tableName, columnName);
   }
+
+  /**
+   * Given a table name in any case, returns crypter class name defined in 
table config
+   * @param tableName table name in any case
+   * @return crypter class name
+   */
+  public String getCrypterClassNameFromTableConfig(String tableName) {
+    TableConfig tableConfig = _tableCache.getTableConfig(tableName);
+    Preconditions.checkNotNull(tableConfig, "Table config is not available for 
table '%s'", tableName);
+    return tableConfig.getValidationConfig().getCrypterClassName();
+  }
+
   /**
    * Table related APIs
    */
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java
new file mode 100644
index 0000000..ff497e9
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.io.File;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.crypt.NoOpPinotCrypter;
+import org.apache.pinot.spi.crypt.PinotCrypterFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class PinotSegmentUploadDownloadRestletResourceTest {
+
+  private static final String TABLE_NAME = "table_abc";
+  private static final String SEGMENT_NAME = "segment_xyz";
+
+  private PinotSegmentUploadDownloadRestletResource _resource = new 
PinotSegmentUploadDownloadRestletResource();
+  private File _encryptedFile;
+  private File _decryptedFile;
+
+  @BeforeClass
+  public void setup()
+      throws Exception {
+
+    // create temp files
+    _encryptedFile = File.createTempFile("segment", ".enc");
+    _decryptedFile = File.createTempFile("segment", ".dec");
+    _encryptedFile.deleteOnExit();
+    _decryptedFile.deleteOnExit();
+
+    // configure pinot crypter
+    Configuration conf = new PropertiesConfiguration();
+    conf.addProperty("class.nooppinotcrypter", 
NoOpPinotCrypter.class.getName());
+    PinotCrypterFactory.init(conf);
+  }
+
+  @Test
+  public void testEncryptSegmentIfNeeded_crypterInTableConfig() {
+
+    // arrange
+    boolean uploadedSegmentIsEncrypted = false;
+    String crypterClassNameInTableConfig = "NoOpPinotCrypter";
+    String crypterClassNameUsedInUploadedSegment = null;
+
+    // act
+    Pair<String, File> encryptionInfo = _resource
+        .encryptSegmentIfNeeded(_decryptedFile, _encryptedFile, 
uploadedSegmentIsEncrypted,
+            crypterClassNameUsedInUploadedSegment, 
crypterClassNameInTableConfig, SEGMENT_NAME, TABLE_NAME);
+
+    // assert
+    assertEquals("NoOpPinotCrypter", encryptionInfo.getLeft());
+    assertEquals(_encryptedFile, encryptionInfo.getRight());
+  }
+
+  @Test
+  public void testEncryptSegmentIfNeeded_uploadedSegmentIsEncrypted() {
+
+    // arrange
+    boolean uploadedSegmentIsEncrypted = true;
+    String crypterClassNameInTableConfig = "NoOpPinotCrypter";
+    String crypterClassNameUsedInUploadedSegment = "NoOpPinotCrypter";
+
+    // act
+    Pair<String, File> encryptionInfo = _resource
+        .encryptSegmentIfNeeded(_decryptedFile, _encryptedFile, 
uploadedSegmentIsEncrypted,
+            crypterClassNameUsedInUploadedSegment, 
crypterClassNameInTableConfig, SEGMENT_NAME, TABLE_NAME);
+
+    // assert
+    assertEquals("NoOpPinotCrypter", encryptionInfo.getLeft());
+    assertEquals(_encryptedFile, encryptionInfo.getRight());
+  }
+
+  @Test(expectedExceptions = ControllerApplicationException.class, 
expectedExceptionsMessageRegExp = "Uploaded segment"
+      + " is encrypted with 'FancyCrypter' while table config requires 
'NoOpPinotCrypter' as crypter .*")
+  public void testEncryptSegmentIfNeeded_differentCrypters() {
+
+    // arrange
+    boolean uploadedSegmentIsEncrypted = true;
+    String crypterClassNameInTableConfig = "NoOpPinotCrypter";
+    String crypterClassNameUsedInUploadedSegment = "FancyCrypter";
+
+    // act
+    _resource.encryptSegmentIfNeeded(_decryptedFile, _encryptedFile, 
uploadedSegmentIsEncrypted,
+        crypterClassNameUsedInUploadedSegment, crypterClassNameInTableConfig, 
SEGMENT_NAME, TABLE_NAME);
+  }
+
+  @Test
+  public void testEncryptSegmentIfNeeded_noEncryption() {
+
+    // arrange
+    boolean uploadedSegmentIsEncrypted = false;
+    String crypterClassNameInTableConfig = null;
+    String crypterClassNameUsedInUploadedSegment = null;
+
+    // act
+    Pair<String, File> encryptionInfo = _resource
+        .encryptSegmentIfNeeded(_decryptedFile, _encryptedFile, 
uploadedSegmentIsEncrypted,
+            crypterClassNameUsedInUploadedSegment, 
crypterClassNameInTableConfig, SEGMENT_NAME, TABLE_NAME);
+
+    // assert
+    assertNull(encryptionInfo.getLeft());
+    assertEquals(_decryptedFile, encryptionInfo.getRight());
+  }
+}
\ No newline at end of file
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index 2b52497..bda1f09 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -66,9 +66,8 @@ public class ZKOperatorTest extends ControllerTest {
     when(segmentMetadata.getCrc()).thenReturn("12345");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(123L);
     HttpHeaders httpHeaders = mock(HttpHeaders.class);
-    
when(httpHeaders.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER)).thenReturn("crypter");
     zkOperator.completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata, 
null, null, false, httpHeaders, "downloadUrl",
-        false);
+        false, "crypter");
 
     OfflineSegmentZKMetadata segmentZKMetadata =
         _helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, 
SEGMENT_NAME);
@@ -84,7 +83,7 @@ public class ZKOperatorTest extends ControllerTest {
     when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
     try {
       zkOperator.completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata, 
null, null, false, httpHeaders,
-          "otherDownloadUrl", false);
+          "otherDownloadUrl", false, null);
       fail();
     } catch (Exception e) {
       // Expected
@@ -94,10 +93,9 @@ public class ZKOperatorTest extends ControllerTest {
     // downloadURL and crypter
     
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
-    
when(httpHeaders.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER)).thenReturn("otherCrypter");
     zkOperator
         .completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata, null, 
null, false, httpHeaders, "otherDownloadUrl",
-            false);
+            false, "otherCrypter");
     segmentZKMetadata = 
_helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, SEGMENT_NAME);
     assertEquals(segmentZKMetadata.getCrc(), 12345L);
     // Push time should not change
@@ -113,12 +111,11 @@ public class ZKOperatorTest extends ControllerTest {
     // Refresh the segment with a different segment (different CRC)
     when(segmentMetadata.getCrc()).thenReturn("23456");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(789L);
-    
when(httpHeaders.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER)).thenReturn("otherCrypter");
     // Add a tiny sleep to guarantee that refresh time is different from the 
previous round
     Thread.sleep(10L);
     zkOperator
         .completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata, null, 
null, false, httpHeaders, "otherDownloadUrl",
-            false);
+            false, "otherCrypter");
     segmentZKMetadata = 
_helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, SEGMENT_NAME);
     assertEquals(segmentZKMetadata.getCrc(), 23456L);
     // Push time should not change
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 30451ab..8bc1567 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -39,6 +39,7 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
   private String _segmentAssignmentStrategy;
   private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig;
   private CompletionConfig _completionConfig;
+  private String _crypterClassName;
   // Possible values can be http or https. If this field is set, a Pinot 
server can download segments from peer servers
   // using the specified download scheme. Both realtime tables and offline 
tables can set this field.
   // For more usage of this field, please refer to this design doc:
@@ -158,9 +159,19 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
     return Integer.parseInt(_replicasPerPartition);
   }
 
-  public String getPeerSegmentDownloadScheme() { return 
_peerSegmentDownloadScheme; }
+  public String getPeerSegmentDownloadScheme() {
+    return _peerSegmentDownloadScheme;
+  }
 
   public void setPeerSegmentDownloadScheme(String peerSegmentDownloadScheme) {
     _peerSegmentDownloadScheme = peerSegmentDownloadScheme;
   }
+
+  public String getCrypterClassName() {
+    return _crypterClassName;
+  }
+
+  public void setCrypterClassName(String crypterClassName) {
+    _crypterClassName = crypterClassName;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java
index 1f5e83a..803147f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java
@@ -19,7 +19,9 @@
 package org.apache.pinot.spi.crypt;
 
 import java.io.File;
+import java.io.IOException;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,11 +39,21 @@ public class NoOpPinotCrypter implements PinotCrypter {
 
   @Override
   public void encrypt(File decryptedFile, File encryptedFile) {
-    return;
+    try {
+      FileUtils.copyFile(decryptedFile, encryptedFile);
+    } catch (IOException e) {
+      LOGGER.warn("Could not encrypt file");
+      FileUtils.deleteQuietly(encryptedFile);
+    }
   }
 
   @Override
   public void decrypt(File encryptedFile, File decryptedFile) {
-    return;
+    try {
+      FileUtils.copyFile(encryptedFile, decryptedFile);
+    } catch (IOException e) {
+      LOGGER.warn("Could not decrypt file");
+      FileUtils.deleteQuietly(decryptedFile);
+    }
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index a81836f..055b22d 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -68,6 +68,7 @@ public class TableConfigBuilder {
   private String _peerSegmentDownloadScheme;
   private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig;
   private CompletionConfig _completionConfig;
+  private String _crypterClassName;
 
   // Tenant config related
   private String _brokerTenant;
@@ -173,6 +174,11 @@ public class TableConfigBuilder {
     return this;
   }
 
+  public TableConfigBuilder setCrypterClassName(String crypterClassName) {
+    _crypterClassName = crypterClassName;
+    return this;
+  }
+
   public TableConfigBuilder setBrokerTenant(String brokerTenant) {
     _brokerTenant = brokerTenant;
     return this;
@@ -318,6 +324,7 @@ public class TableConfigBuilder {
     if (_isLLC) {
       validationConfig.setReplicasPerPartition(_numReplicas);
     }
+    validationConfig.setCrypterClassName(_crypterClassName);
 
     // Tenant config
     TenantConfig tenantConfig = new TenantConfig(_brokerTenant, _serverTenant, 
_tagOverrideConfig);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to