This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 40a3152 Add segment encryption on Controller based on table config (#5617) 40a3152 is described below commit 40a31529213efa7a5d03eb4e0ca020aa4045e0ca Author: Sajjad <moradi.saj...@gmail.com> AuthorDate: Wed Jul 1 13:23:24 2020 -0700 Add segment encryption on Controller based on table config (#5617) * Add segment encryption on Controller based on table config * Applied PR's comments * Addressed PR's comments * Add license header + equality check on crypters * Addressed PR's comments * Fix log message Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com> Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com> --- .../common/utils/config/TableConfigUtils.java | 10 +- .../pinot/common/utils/helix/TableCache.java | 4 + .../common/utils/config/TableConfigSerDeTest.java | 6 +- .../PinotSegmentUploadDownloadRestletResource.java | 122 ++++++++++++-------- .../pinot/controller/api/upload/ZKOperator.java | 11 +- .../helix/core/PinotHelixResourceManager.java | 12 ++ ...otSegmentUploadDownloadRestletResourceTest.java | 125 +++++++++++++++++++++ .../controller/api/upload/ZKOperatorTest.java | 11 +- .../SegmentsValidationAndRetentionConfig.java | 13 ++- .../apache/pinot/spi/crypt/NoOpPinotCrypter.java | 16 ++- .../spi/utils/builder/TableConfigBuilder.java | 7 ++ 11 files changed, 270 insertions(+), 67 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java index 45610fa..2ee7b47 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java @@ -132,8 +132,8 @@ public class TableConfigUtils { } return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig, - quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, - upsertConfig, ingestionConfig); + quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig, + ingestionConfig); } public static ZNRecord toZNRecord(TableConfig tableConfig) @@ -228,8 +228,10 @@ public class TableConfigUtils { } String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme(); if (peerSegmentDownloadScheme != null) { - if (!"http".equalsIgnoreCase(peerSegmentDownloadScheme) && !"https".equalsIgnoreCase(peerSegmentDownloadScheme)) { - throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme + "' for peerSegmentDownloadScheme. Must be one of http nor https" ); + if (!"http".equalsIgnoreCase(peerSegmentDownloadScheme) && !"https" + .equalsIgnoreCase(peerSegmentDownloadScheme)) { + throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme + + "' for peerSegmentDownloadScheme. Must be one of http nor https"); } } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java index 474adbf..6f46ac2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java @@ -80,6 +80,10 @@ public class TableCache { return columnName; } + public TableConfig getTableConfig(String tableName) { + return _tableConfigChangeListener._tableConfigMap.get(tableName); + } + class TableConfigChangeListener implements IZkChildListener, IZkDataListener { Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>(); diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java index af5e5c0..f877c3f 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java @@ -248,8 +248,10 @@ public class TableConfigSerDeTest { { // with SegmentsValidationAndRetentionConfig TableConfig tableConfig = tableConfigBuilder.setPeerSegmentDownloadScheme("http").build(); - checkSegmentsValidationAndRetentionConfig(JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class)); - checkSegmentsValidationAndRetentionConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig))); + checkSegmentsValidationAndRetentionConfig( + JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class)); + checkSegmentsValidationAndRetentionConfig( + TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig))); } { // With ingestion config diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 576390d..1c9500e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.api.resources; +import com.google.common.base.Strings; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -54,6 +55,8 @@ import javax.ws.rs.core.StreamingOutput; import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.CommonConstants; @@ -71,7 +74,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.core.metadata.DefaultMetadataExtractor; import org.apache.pinot.core.metadata.MetadataExtractorFactory; import org.apache.pinot.core.segment.index.metadata.SegmentMetadata; -import org.apache.pinot.spi.crypt.NoOpPinotCrypter; import org.apache.pinot.spi.crypt.PinotCrypter; import org.apache.pinot.spi.crypt.PinotCrypterFactory; import org.apache.pinot.spi.filesystem.PinotFS; @@ -177,13 +179,13 @@ public class PinotSegmentUploadDownloadRestletResource { private SuccessResponse uploadSegment(@Nullable String tableName, FormDataMultiPart multiPart, boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation) { String uploadTypeStr = null; - String crypterClassName = null; + String crypterClassNameInHeader = null; String downloadUri = null; if (headers != null) { extractHttpHeader(headers, CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER); extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER); uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE); - crypterClassName = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER); + crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER); downloadUri = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI); } @@ -194,38 +196,31 @@ public class PinotSegmentUploadDownloadRestletResource { ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance(); String tempFileName = TMP_DIR_PREFIX + System.nanoTime(); tempDecryptedFile = new File(provider.getFileUploadTempDir(), tempFileName); + tempEncryptedFile = new File(provider.getFileUploadTempDir(), tempFileName + ENCRYPTED_SUFFIX); tempSegmentDir = new File(provider.getUntarredFileTempDir(), tempFileName); - // Set default crypter to the noop crypter when no crypter header is sent - // In this case, the noop crypter will not do any operations, so the encrypted and decrypted file will have the same - // file path. - if (crypterClassName == null) { - crypterClassName = NoOpPinotCrypter.class.getSimpleName(); - tempEncryptedFile = new File(provider.getFileUploadTempDir(), tempFileName); - } else { - tempEncryptedFile = new File(provider.getFileUploadTempDir(), tempFileName + ENCRYPTED_SUFFIX); - } - - // TODO: Change when metadata upload added - String metadataProviderClass = DefaultMetadataExtractor.class.getName(); + boolean uploadedSegmentIsEncrypted = !Strings.isNullOrEmpty(crypterClassNameInHeader); - SegmentMetadata segmentMetadata; + File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile; FileUploadDownloadClient.FileUploadType uploadType = getUploadType(uploadTypeStr); switch (uploadType) { case URI: - segmentMetadata = - getMetadataForURI(crypterClassName, downloadUri, tempEncryptedFile, tempDecryptedFile, tempSegmentDir, - metadataProviderClass); + downloadSegmentFileFromURI(downloadUri, dstFile, tableName); break; case SEGMENT: - getFileFromMultipart(multiPart, tempEncryptedFile); - segmentMetadata = getSegmentMetadata(crypterClassName, tempEncryptedFile, tempDecryptedFile, tempSegmentDir, - metadataProviderClass); + createSegmentFileFromMultipart(multiPart, dstFile); break; default: throw new UnsupportedOperationException("Unsupported upload type: " + uploadType); } + if (uploadedSegmentIsEncrypted) { + decryptFile(crypterClassNameInHeader, tempEncryptedFile, tempDecryptedFile); + } + + String metadataProviderClass = DefaultMetadataExtractor.class.getName(); + SegmentMetadata segmentMetadata = getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass); + // Fetch segment name String segmentName = segmentMetadata.getName(); @@ -251,6 +246,17 @@ public class PinotSegmentUploadDownloadRestletResource { _controllerMetrics, _leadControllerManager.isLeaderForTable(offlineTableName)) .validateOfflineSegment(offlineTableName, segmentMetadata, tempSegmentDir); + // Encrypt segment + String crypterClassNameInTableConfig = + _pinotHelixResourceManager.getCrypterClassNameFromTableConfig(offlineTableName); + Pair<String, File> encryptionInfo = + encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, uploadedSegmentIsEncrypted, + crypterClassNameInHeader, crypterClassNameInTableConfig, segmentName, tableName); + + String crypterClassName = encryptionInfo.getLeft(); + File finalSegmentFile = encryptionInfo.getRight(); + + // ZK download URI String zkDownloadUri; // This boolean is here for V1 segment upload, where we keep the segment in the downloadURI sent in the header. // We will deprecate this behavior eventually. @@ -264,8 +270,8 @@ public class PinotSegmentUploadDownloadRestletResource { } // Zk operations - completeZkOperations(enableParallelPushProtection, headers, tempEncryptedFile, rawTableName, segmentMetadata, - segmentName, zkDownloadUri, moveSegmentToFinalLocation); + completeZkOperations(enableParallelPushProtection, headers, finalSegmentFile, rawTableName, segmentMetadata, + segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName); return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + rawTableName); } catch (WebApplicationException e) { @@ -290,6 +296,39 @@ public class PinotSegmentUploadDownloadRestletResource { return value; } + Pair<String, File> encryptSegmentIfNeeded(File tempDecryptedFile, File tempEncryptedFile, + boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment, String crypterClassNameInTableConfig, + String segmentName, String tableName) { + + boolean segmentNeedsEncryption = !Strings.isNullOrEmpty(crypterClassNameInTableConfig); + + // form the output + File finalSegmentFile = + (isUploadedSegmentEncrypted || segmentNeedsEncryption) ? tempEncryptedFile : tempDecryptedFile; + String crypterClassName = Strings.isNullOrEmpty(crypterClassNameInTableConfig) ? crypterUsedInUploadedSegment + : crypterClassNameInTableConfig; + ImmutablePair<String, File> out = ImmutablePair.of(crypterClassName, finalSegmentFile); + + if (!segmentNeedsEncryption) { + return out; + } + + if (isUploadedSegmentEncrypted && !crypterClassNameInTableConfig.equals(crypterUsedInUploadedSegment)) { + throw new ControllerApplicationException(LOGGER, String.format( + "Uploaded segment is encrypted with '%s' while table config requires '%s' as crypter " + + "(segment name = '%s', table name = '%s').", crypterUsedInUploadedSegment, + crypterClassNameInTableConfig, segmentName, tableName), Response.Status.INTERNAL_SERVER_ERROR); + } + + // encrypt segment + PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassNameInTableConfig); + LOGGER.info("Using crypter class '{}' for encrypting '{}' to '{}' (segment name = '{}', table name = '{}').", + crypterClassNameInTableConfig, tempDecryptedFile, tempEncryptedFile, segmentName, tableName); + pinotCrypter.encrypt(tempDecryptedFile, tempEncryptedFile); + + return out; + } + private String getZkDownloadURIForSegmentUpload(String rawTableName, String segmentName) { ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance(); URI dataDirURI = provider.getDataDirURI(); @@ -303,46 +342,39 @@ public class PinotSegmentUploadDownloadRestletResource { } } - private SegmentMetadata getMetadataForURI(String crypterClassHeader, String currentSegmentLocationURI, - File tempEncryptedFile, File tempDecryptedFile, File tempSegmentDir, String metadataProviderClass) + private void downloadSegmentFileFromURI(String currentSegmentLocationURI, File destFile, String tableName) throws Exception { - SegmentMetadata segmentMetadata; if (currentSegmentLocationURI == null || currentSegmentLocationURI.isEmpty()) { throw new ControllerApplicationException(LOGGER, "Failed to get downloadURI, needed for URI upload", Response.Status.BAD_REQUEST); } - LOGGER.info("Downloading segment from {} to {}", currentSegmentLocationURI, tempEncryptedFile.getAbsolutePath()); - SegmentFetcherFactory.fetchSegmentToLocal(currentSegmentLocationURI, tempEncryptedFile); - segmentMetadata = getSegmentMetadata(crypterClassHeader, tempEncryptedFile, tempDecryptedFile, tempSegmentDir, - metadataProviderClass); - return segmentMetadata; + LOGGER.info("Downloading segment from {} to {} for table {}", currentSegmentLocationURI, destFile.getAbsolutePath(), + tableName); + SegmentFetcherFactory.fetchSegmentToLocal(currentSegmentLocationURI, destFile); } - private SegmentMetadata getSegmentMetadata(String crypterClassHeader, File tempEncryptedFile, File tempDecryptedFile, - File tempSegmentDir, String metadataProviderClass) + private SegmentMetadata getSegmentMetadata(File tempDecryptedFile, File tempSegmentDir, String metadataProviderClass) throws Exception { - - decryptFile(crypterClassHeader, tempEncryptedFile, tempDecryptedFile); - // Call metadata provider to extract metadata with file object uri return MetadataExtractorFactory.create(metadataProviderClass).extractMetadata(tempDecryptedFile, tempSegmentDir); } - private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File tempEncryptedFile, + private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File uploadedSegmentFile, String rawTableName, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI, - boolean moveSegmentToFinalLocation) + boolean moveSegmentToFinalLocation, String crypter) throws Exception { URI finalSegmentLocationURI = URIUtils .getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), rawTableName, URIUtils.encode(segmentName)); ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); - zkOperator.completeSegmentOperations(rawTableName, segmentMetadata, finalSegmentLocationURI, tempEncryptedFile, - enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation); + zkOperator.completeSegmentOperations(rawTableName, segmentMetadata, finalSegmentLocationURI, uploadedSegmentFile, + enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter); } - private void decryptFile(String crypterClassHeader, File tempEncryptedFile, File tempDecryptedFile) { - PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassHeader); - LOGGER.info("Using crypter class {}", pinotCrypter.getClass().getName()); + private void decryptFile(String crypterClassName, File tempEncryptedFile, File tempDecryptedFile) { + PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassName); + LOGGER.info("Using crypter class {} for decrypting {} to {}", pinotCrypter.getClass().getName(), tempEncryptedFile, + tempDecryptedFile); pinotCrypter.decrypt(tempEncryptedFile, tempDecryptedFile); } @@ -422,7 +454,7 @@ public class PinotSegmentUploadDownloadRestletResource { } } - private File getFileFromMultipart(FormDataMultiPart multiPart, File dstFile) + private File createSegmentFileFromMultipart(FormDataMultiPart multiPart, File dstFile) throws IOException { // Read segment file or segment metadata file and directly use that information to update zk Map<String, List<FormDataBodyPart>> segmentMetadataMap = multiPart.getFields(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index b33161b..fc31c1c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -59,7 +59,7 @@ public class ZKOperator { public void completeSegmentOperations(String rawTableName, SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, - HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation) + HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter) throws Exception { String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); String segmentName = segmentMetadata.getName(); @@ -69,7 +69,6 @@ public class ZKOperator { _pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, segmentName); if (segmentMetadataZnRecord == null) { LOGGER.info("Adding new segment {} from table {}", segmentName, rawTableName); - String crypter = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER); processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, crypter, rawTableName, segmentName, moveSegmentToFinalLocation); return; @@ -78,13 +77,14 @@ public class ZKOperator { LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, rawTableName); processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, - enableParallelPushProtection, headers, zkDownloadURI, offlineTableName, segmentName, segmentMetadataZnRecord, - moveSegmentToFinalLocation); + enableParallelPushProtection, headers, zkDownloadURI, crypter, offlineTableName, segmentName, + segmentMetadataZnRecord, moveSegmentToFinalLocation); } private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI, - String offlineTableName, String segmentName, ZNRecord znRecord, boolean moveSegmentToFinalLocation) + String crypter, String offlineTableName, String segmentName, ZNRecord znRecord, + boolean moveSegmentToFinalLocation) throws Exception { OfflineSegmentZKMetadata existingSegmentZKMetadata = new OfflineSegmentZKMetadata(znRecord); @@ -170,7 +170,6 @@ public class ZKOperator { zkDownloadURI); } - String crypter = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER); _pinotHelixResourceManager .refreshSegment(offlineTableName, segmentMetadata, existingSegmentZKMetadata, zkDownloadURI, crypter); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index a49c5e8..7abe59a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -458,6 +458,18 @@ public class PinotHelixResourceManager { public String getActualColumnName(String tableName, String columnName) { return _tableCache.getActualColumnName(tableName, columnName); } + + /** + * Given a table name in any case, returns crypter class name defined in table config + * @param tableName table name in any case + * @return crypter class name + */ + public String getCrypterClassNameFromTableConfig(String tableName) { + TableConfig tableConfig = _tableCache.getTableConfig(tableName); + Preconditions.checkNotNull(tableConfig, "Table config is not available for table '%s'", tableName); + return tableConfig.getValidationConfig().getCrypterClassName(); + } + /** * Table related APIs */ diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java new file mode 100644 index 0000000..ff497e9 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import java.io.File; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.spi.crypt.NoOpPinotCrypter; +import org.apache.pinot.spi.crypt.PinotCrypterFactory; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class PinotSegmentUploadDownloadRestletResourceTest { + + private static final String TABLE_NAME = "table_abc"; + private static final String SEGMENT_NAME = "segment_xyz"; + + private PinotSegmentUploadDownloadRestletResource _resource = new PinotSegmentUploadDownloadRestletResource(); + private File _encryptedFile; + private File _decryptedFile; + + @BeforeClass + public void setup() + throws Exception { + + // create temp files + _encryptedFile = File.createTempFile("segment", ".enc"); + _decryptedFile = File.createTempFile("segment", ".dec"); + _encryptedFile.deleteOnExit(); + _decryptedFile.deleteOnExit(); + + // configure pinot crypter + Configuration conf = new PropertiesConfiguration(); + conf.addProperty("class.nooppinotcrypter", NoOpPinotCrypter.class.getName()); + PinotCrypterFactory.init(conf); + } + + @Test + public void testEncryptSegmentIfNeeded_crypterInTableConfig() { + + // arrange + boolean uploadedSegmentIsEncrypted = false; + String crypterClassNameInTableConfig = "NoOpPinotCrypter"; + String crypterClassNameUsedInUploadedSegment = null; + + // act + Pair<String, File> encryptionInfo = _resource + .encryptSegmentIfNeeded(_decryptedFile, _encryptedFile, uploadedSegmentIsEncrypted, + crypterClassNameUsedInUploadedSegment, crypterClassNameInTableConfig, SEGMENT_NAME, TABLE_NAME); + + // assert + assertEquals("NoOpPinotCrypter", encryptionInfo.getLeft()); + assertEquals(_encryptedFile, encryptionInfo.getRight()); + } + + @Test + public void testEncryptSegmentIfNeeded_uploadedSegmentIsEncrypted() { + + // arrange + boolean uploadedSegmentIsEncrypted = true; + String crypterClassNameInTableConfig = "NoOpPinotCrypter"; + String crypterClassNameUsedInUploadedSegment = "NoOpPinotCrypter"; + + // act + Pair<String, File> encryptionInfo = _resource + .encryptSegmentIfNeeded(_decryptedFile, _encryptedFile, uploadedSegmentIsEncrypted, + crypterClassNameUsedInUploadedSegment, crypterClassNameInTableConfig, SEGMENT_NAME, TABLE_NAME); + + // assert + assertEquals("NoOpPinotCrypter", encryptionInfo.getLeft()); + assertEquals(_encryptedFile, encryptionInfo.getRight()); + } + + @Test(expectedExceptions = ControllerApplicationException.class, expectedExceptionsMessageRegExp = "Uploaded segment" + + " is encrypted with 'FancyCrypter' while table config requires 'NoOpPinotCrypter' as crypter .*") + public void testEncryptSegmentIfNeeded_differentCrypters() { + + // arrange + boolean uploadedSegmentIsEncrypted = true; + String crypterClassNameInTableConfig = "NoOpPinotCrypter"; + String crypterClassNameUsedInUploadedSegment = "FancyCrypter"; + + // act + _resource.encryptSegmentIfNeeded(_decryptedFile, _encryptedFile, uploadedSegmentIsEncrypted, + crypterClassNameUsedInUploadedSegment, crypterClassNameInTableConfig, SEGMENT_NAME, TABLE_NAME); + } + + @Test + public void testEncryptSegmentIfNeeded_noEncryption() { + + // arrange + boolean uploadedSegmentIsEncrypted = false; + String crypterClassNameInTableConfig = null; + String crypterClassNameUsedInUploadedSegment = null; + + // act + Pair<String, File> encryptionInfo = _resource + .encryptSegmentIfNeeded(_decryptedFile, _encryptedFile, uploadedSegmentIsEncrypted, + crypterClassNameUsedInUploadedSegment, crypterClassNameInTableConfig, SEGMENT_NAME, TABLE_NAME); + + // assert + assertNull(encryptionInfo.getLeft()); + assertEquals(_decryptedFile, encryptionInfo.getRight()); + } +} \ No newline at end of file diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java index 2b52497..bda1f09 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java @@ -66,9 +66,8 @@ public class ZKOperatorTest extends ControllerTest { when(segmentMetadata.getCrc()).thenReturn("12345"); when(segmentMetadata.getIndexCreationTime()).thenReturn(123L); HttpHeaders httpHeaders = mock(HttpHeaders.class); - when(httpHeaders.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER)).thenReturn("crypter"); zkOperator.completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata, null, null, false, httpHeaders, "downloadUrl", - false); + false, "crypter"); OfflineSegmentZKMetadata segmentZKMetadata = _helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, SEGMENT_NAME); @@ -84,7 +83,7 @@ public class ZKOperatorTest extends ControllerTest { when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123"); try { zkOperator.completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata, null, null, false, httpHeaders, - "otherDownloadUrl", false); + "otherDownloadUrl", false, null); fail(); } catch (Exception e) { // Expected @@ -94,10 +93,9 @@ public class ZKOperatorTest extends ControllerTest { // downloadURL and crypter when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345"); when(segmentMetadata.getIndexCreationTime()).thenReturn(456L); - when(httpHeaders.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER)).thenReturn("otherCrypter"); zkOperator .completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata, null, null, false, httpHeaders, "otherDownloadUrl", - false); + false, "otherCrypter"); segmentZKMetadata = _helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, SEGMENT_NAME); assertEquals(segmentZKMetadata.getCrc(), 12345L); // Push time should not change @@ -113,12 +111,11 @@ public class ZKOperatorTest extends ControllerTest { // Refresh the segment with a different segment (different CRC) when(segmentMetadata.getCrc()).thenReturn("23456"); when(segmentMetadata.getIndexCreationTime()).thenReturn(789L); - when(httpHeaders.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER)).thenReturn("otherCrypter"); // Add a tiny sleep to guarantee that refresh time is different from the previous round Thread.sleep(10L); zkOperator .completeSegmentOperations(RAW_TABLE_NAME, segmentMetadata, null, null, false, httpHeaders, "otherDownloadUrl", - false); + false, "otherCrypter"); segmentZKMetadata = _helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, SEGMENT_NAME); assertEquals(segmentZKMetadata.getCrc(), 23456L); // Push time should not change diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java index 30451ab..8bc1567 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java @@ -39,6 +39,7 @@ public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig { private String _segmentAssignmentStrategy; private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig; private CompletionConfig _completionConfig; + private String _crypterClassName; // Possible values can be http or https. If this field is set, a Pinot server can download segments from peer servers // using the specified download scheme. Both realtime tables and offline tables can set this field. // For more usage of this field, please refer to this design doc: @@ -158,9 +159,19 @@ public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig { return Integer.parseInt(_replicasPerPartition); } - public String getPeerSegmentDownloadScheme() { return _peerSegmentDownloadScheme; } + public String getPeerSegmentDownloadScheme() { + return _peerSegmentDownloadScheme; + } public void setPeerSegmentDownloadScheme(String peerSegmentDownloadScheme) { _peerSegmentDownloadScheme = peerSegmentDownloadScheme; } + + public String getCrypterClassName() { + return _crypterClassName; + } + + public void setCrypterClassName(String crypterClassName) { + _crypterClassName = crypterClassName; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java b/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java index 1f5e83a..803147f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/crypt/NoOpPinotCrypter.java @@ -19,7 +19,9 @@ package org.apache.pinot.spi.crypt; import java.io.File; +import java.io.IOException; import org.apache.commons.configuration.Configuration; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,11 +39,21 @@ public class NoOpPinotCrypter implements PinotCrypter { @Override public void encrypt(File decryptedFile, File encryptedFile) { - return; + try { + FileUtils.copyFile(decryptedFile, encryptedFile); + } catch (IOException e) { + LOGGER.warn("Could not encrypt file"); + FileUtils.deleteQuietly(encryptedFile); + } } @Override public void decrypt(File encryptedFile, File decryptedFile) { - return; + try { + FileUtils.copyFile(encryptedFile, decryptedFile); + } catch (IOException e) { + LOGGER.warn("Could not decrypt file"); + FileUtils.deleteQuietly(decryptedFile); + } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java index a81836f..055b22d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java @@ -68,6 +68,7 @@ public class TableConfigBuilder { private String _peerSegmentDownloadScheme; private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig; private CompletionConfig _completionConfig; + private String _crypterClassName; // Tenant config related private String _brokerTenant; @@ -173,6 +174,11 @@ public class TableConfigBuilder { return this; } + public TableConfigBuilder setCrypterClassName(String crypterClassName) { + _crypterClassName = crypterClassName; + return this; + } + public TableConfigBuilder setBrokerTenant(String brokerTenant) { _brokerTenant = brokerTenant; return this; @@ -318,6 +324,7 @@ public class TableConfigBuilder { if (_isLLC) { validationConfig.setReplicasPerPartition(_numReplicas); } + validationConfig.setCrypterClassName(_crypterClassName); // Tenant config TenantConfig tenantConfig = new TenantConfig(_brokerTenant, _serverTenant, _tagOverrideConfig); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org