This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch deep.store.dir.structure in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/deep.store.dir.structure by this push: new 48d7602 Revert deep store directory structure changes after introducing upsert (#6974) 48d7602 is described below commit 48d760248917e621b635b01982bce1e5805b7975 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Tue May 25 12:16:21 2021 -0700 Revert deep store directory structure changes after introducing upsert (#6974) --- .../metadata/segment/ColumnPartitionMetadata.java | 4 +- .../common/utils/FileUploadDownloadClient.java | 24 --- .../apache/pinot/common/utils/SegmentUtils.java | 68 ------- .../apache/pinot/controller/ControllerConf.java | 5 +- .../PinotSegmentUploadDownloadRestletResource.java | 68 +++---- .../pinot/controller/api/upload/ZKOperator.java | 55 +++--- .../helix/core/PinotHelixResourceManager.java | 96 +++------ .../segment/RealtimeSegmentAssignment.java | 35 ++-- .../helix/core/util/ZKMetadataUtils.java | 32 +-- .../api/PinotSegmentRestletResourceTest.java | 6 +- .../pinot/controller/api/TableViewsTest.java | 2 +- .../controller/api/upload/ZKOperatorTest.java | 10 +- .../helix/ControllerInstanceToggleTest.java | 2 +- .../controller/helix/ControllerSentinelTestV2.java | 6 +- .../controller/helix/PinotResourceManagerTest.java | 99 ++-------- .../TableRebalancerClusterStatelessTest.java | 4 +- .../helix/core/retention/RetentionManagerTest.java | 4 +- .../controller/utils/SegmentMetadataMockUtils.java | 8 - .../validation/ValidationManagerTest.java | 4 +- .../manager/realtime/RealtimeTableDataManager.java | 26 +-- .../tests/BaseClusterIntegrationTest.java | 25 --- .../tests/BasicAuthRealtimeIntegrationTest.java | 6 +- .../UpsertTableSegmentUploadIntegrationTest.java | 219 --------------------- .../src/test/resources/upsert_table_test.schema | 33 ---- .../src/test/resources/upsert_test.tar.gz | Bin 9911 -> 0 bytes .../upsert/PartitionUpsertMetadataManager.java | 2 - .../apache/pinot/spi/utils/CommonConstants.java | 7 +- .../pinot/tools/perf/PerfBenchmarkDriver.java | 5 +- .../pinot/tools/perf/PerfBenchmarkRunner.java | 2 +- 29 files changed, 155 insertions(+), 702 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java index 537f01e..0220c4f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java @@ -34,8 +34,8 @@ import org.apache.commons.lang3.StringUtils; /** * Class for partition related column metadata: * <ul> - * <li>The name of the Partition function used to map the column values to their partitions</li> - * <li>Total number of partitions</li> + * <li>Partition function</li> + * <li>Number of total partitions</li> * <li>Set of partitions the column contains</li> * </ul> */ diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index ba32144..c874891 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -62,7 +62,6 @@ import org.apache.http.message.BasicHeader; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.apache.pinot.common.exception.HttpErrorStatusException; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; @@ -92,7 +91,6 @@ public class FileUploadDownloadClient implements Closeable { public static class QueryParameters { public static final String ENABLE_PARALLEL_PUSH_PROTECTION = "enableParallelPushProtection"; public static final String TABLE_NAME = "tableName"; - public static final String TABLE_TYPE = "tableType"; } public enum FileUploadType { @@ -690,28 +688,6 @@ public class FileUploadDownloadClient implements Closeable { } /** - * Upload segment with segment file using default settings. Include table name and type as a request parameters. - * - * @param uri URI - * @param segmentName Segment name - * @param segmentFile Segment file - * @param tableName Table name with or without type suffix - * @param tableType Table type - * @return Response - * @throws IOException - * @throws HttpErrorStatusException - */ - public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmentFile, String tableName, - TableType tableType) - throws IOException, HttpErrorStatusException { - // Add table name and type request parameters - NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, tableName); - NameValuePair tableTypeValuePair = new BasicNameValuePair(QueryParameters.TABLE_TYPE, tableType.name()); - List<NameValuePair> parameters = Arrays.asList(tableNameValuePair, tableTypeValuePair); - return uploadSegment(uri, segmentName, segmentFile, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS); - } - - /** * Upload segment with segment file input stream. * * Note: table name has to be set as a parameter. diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java deleted file mode 100644 index 84c7a79..0000000 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.common.utils; - -import com.google.common.base.Preconditions; -import java.util.Set; -import org.apache.helix.HelixManager; -import org.apache.pinot.common.metadata.ZKMetadataProvider; -import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; -import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; - - -// Util functions related to segments. -public class SegmentUtils { - // Returns the partition id of a realtime segment based segment name and segment metadata info retrieved via Helix. - // Important: The method is costly because it may read data from zookeeper. Do not use it in any query execution - // path. - public static int getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, - HelixManager helixManager, String partitionColumn) { - // A fast path if the segmentName is a LLC segment name and we can get the partition id from the name directly. - if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) { - return new LLCSegmentName(segmentName).getPartitionGroupId(); - } - // Otherwise, retrieve the partition id from the segment zk metadata. Currently only realtime segments from upsert - // enabled tables have partition ids in their segment metadata. - RealtimeSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider - .getRealtimeSegmentZKMetadata(helixManager.getHelixPropertyStore(), realtimeTableName, segmentName); - Preconditions - .checkState(segmentZKMetadata != null, "Failed to find segment ZK metadata for segment: %s of table: %s", - segmentName, realtimeTableName); - return getSegmentPartitionIdFromZkMetaData(realtimeTableName, segmentZKMetadata, partitionColumn); - } - - private static int getSegmentPartitionIdFromZkMetaData(String realtimeTableName, - RealtimeSegmentZKMetadata segmentZKMetadata, String partitionColumn) { - String segmentName = segmentZKMetadata.getSegmentName(); - Preconditions.checkState(segmentZKMetadata.getPartitionMetadata() != null, - "Segment ZK metadata for segment: %s of table: %s does not contain partition metadata", segmentName, - realtimeTableName); - - ColumnPartitionMetadata partitionMetadata = - segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(partitionColumn); - Preconditions.checkState(partitionMetadata != null, - "Segment ZK metadata for segment: %s of table: %s does not contain partition metadata for column: %s. Check if the table is an upsert table.", - segmentName, realtimeTableName, partitionColumn); - Set<Integer> partitions = partitionMetadata.getPartitions(); - Preconditions.checkState(partitions.size() == 1, - "Segment ZK metadata for segment: %s of table: %s contains multiple partitions for column: %s with %s", - segmentName, realtimeTableName, partitionColumn, partitions); - return partitions.iterator().next(); - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index ce91c01..9d47b27 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -80,8 +80,6 @@ public class ControllerConf extends PinotConfiguration { "controller.offline.segment.interval.checker.frequencyInSeconds"; public static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = "controller.realtime.segment.validation.frequencyInSeconds"; - public static final String REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS = - "controller.realtime.segment.validation.initialDelayInSeconds"; public static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = "controller.broker.resource.validation.frequencyInSeconds"; public static final String BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS = @@ -630,8 +628,7 @@ public class ControllerConf extends PinotConfiguration { } public long getRealtimeSegmentValidationManagerInitialDelaySeconds() { - return getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS, - getPeriodicTaskInitialDelayInSeconds()); + return getPeriodicTaskInitialDelayInSeconds(); } public long getPinotTaskManagerInitialDelaySeconds() { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 35a3e07..7584ab0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -184,7 +184,7 @@ public class PinotSegmentUploadDownloadRestletResource { return builder.build(); } - private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType, FormDataMultiPart multiPart, + private SuccessResponse uploadSegment(@Nullable String tableName, FormDataMultiPart multiPart, boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation) { String uploadTypeStr = null; String crypterClassNameInHeader = null; @@ -248,36 +248,24 @@ public class PinotSegmentUploadDownloadRestletResource { LOGGER.info("Uploading a segment {} to table: {}, push type {}, (Derived from segment metadata)", segmentName, tableName, uploadType); } - String tableNameWithType; - if (tableType == TableType.OFFLINE) { - tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); - } else { - if (!_pinotHelixResourceManager.isUpsertTable(rawTableName)) { - throw new UnsupportedOperationException( - "Upload segment to non-upsert realtime table is not supported " + rawTableName); - } - tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); - } - + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); String clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName(); LOGGER.info("Processing upload request for segment: {} of table: {} from client: {}, ingestion descriptor: {}", - segmentName, tableNameWithType, clientAddress, ingestionDescriptor); + segmentName, offlineTableName, clientAddress, ingestionDescriptor); - // Skip segment validation if upload is to an offline table and only segment metadata. Skip segment validation for - // realtime tables because the feature is experimental and only applicable to upsert enabled table currently. - if (tableType == TableType.OFFLINE && uploadType != FileUploadDownloadClient.FileUploadType.METADATA) { + // Skip segment validation if upload only segment metadata + if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) { // Validate segment new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager, - _controllerMetrics, _leadControllerManager.isLeaderForTable(tableNameWithType)) - .validateOfflineSegment(tableNameWithType, segmentMetadata, tempSegmentDir); + _controllerMetrics, _leadControllerManager.isLeaderForTable(offlineTableName)).validateOfflineSegment(offlineTableName, segmentMetadata, tempSegmentDir); } // Encrypt segment String crypterClassNameInTableConfig = - _pinotHelixResourceManager.getCrypterClassNameFromTableConfig(tableNameWithType); + _pinotHelixResourceManager.getCrypterClassNameFromTableConfig(offlineTableName); Pair<String, File> encryptionInfo = encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, uploadedSegmentIsEncrypted, - crypterClassNameInHeader, crypterClassNameInTableConfig, segmentName, tableNameWithType); + crypterClassNameInHeader, crypterClassNameInTableConfig, segmentName, tableName); String crypterClassName = encryptionInfo.getLeft(); File finalSegmentFile = encryptionInfo.getRight(); @@ -289,17 +277,17 @@ public class PinotSegmentUploadDownloadRestletResource { if (!moveSegmentToFinalLocation) { LOGGER .info("Setting zkDownloadUri: to {} for segment: {} of table: {}, skipping move", downloadUri, segmentName, - tableNameWithType); + offlineTableName); zkDownloadUri = downloadUri; } else { - zkDownloadUri = getZkDownloadURIForSegmentUpload(tableNameWithType, segmentName); + zkDownloadUri = getZkDownloadURIForSegmentUpload(rawTableName, segmentName); } // Zk operations - completeZkOperations(enableParallelPushProtection, headers, finalSegmentFile, tableNameWithType, segmentMetadata, + completeZkOperations(enableParallelPushProtection, headers, finalSegmentFile, rawTableName, segmentMetadata, segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName); - return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType); + return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + rawTableName); } catch (WebApplicationException e) { throw e; } catch (Exception e) { @@ -324,7 +312,7 @@ public class PinotSegmentUploadDownloadRestletResource { Pair<String, File> encryptSegmentIfNeeded(File tempDecryptedFile, File tempEncryptedFile, boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment, String crypterClassNameInTableConfig, - String segmentName, String tableNameWithType) { + String segmentName, String tableName) { boolean segmentNeedsEncryption = !Strings.isNullOrEmpty(crypterClassNameInTableConfig); @@ -343,13 +331,13 @@ public class PinotSegmentUploadDownloadRestletResource { throw new ControllerApplicationException(LOGGER, String.format( "Uploaded segment is encrypted with '%s' while table config requires '%s' as crypter " + "(segment name = '%s', table name = '%s').", crypterUsedInUploadedSegment, - crypterClassNameInTableConfig, segmentName, tableNameWithType), Response.Status.INTERNAL_SERVER_ERROR); + crypterClassNameInTableConfig, segmentName, tableName), Response.Status.INTERNAL_SERVER_ERROR); } // encrypt segment PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassNameInTableConfig); LOGGER.info("Using crypter class '{}' for encrypting '{}' to '{}' (segment name = '{}', table name = '{}').", - crypterClassNameInTableConfig, tempDecryptedFile, tempEncryptedFile, segmentName, tableNameWithType); + crypterClassNameInTableConfig, tempDecryptedFile, tempEncryptedFile, segmentName, tableName); pinotCrypter.encrypt(tempDecryptedFile, tempEncryptedFile); return out; @@ -386,14 +374,14 @@ public class PinotSegmentUploadDownloadRestletResource { } private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File uploadedSegmentFile, - String tableNameWithType, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI, + String rawTableName, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter) throws Exception { URI finalSegmentLocationURI = URIUtils - .getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), tableNameWithType, + .getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), rawTableName, URIUtils.encode(segmentName)); ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); - zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, uploadedSegmentFile, + zkOperator.completeSegmentOperations(rawTableName, segmentMetadata, finalSegmentLocationURI, uploadedSegmentFile, enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter); } @@ -416,13 +404,10 @@ public class PinotSegmentUploadDownloadRestletResource { // it keeps it at the downloadURI header that is set. We will not support this endpoint going forward. public void uploadSegmentAsJson(String segmentJsonStr, @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, - @ApiParam(value = "Type of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) @DefaultValue("OFFLINE") String tableType, @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume( - uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), null, enableParallelPushProtection, - headers, request, false)); + asyncResponse.resume(uploadSegment(tableName, null, enableParallelPushProtection, headers, request, false)); } catch (Throwable t) { asyncResponse.resume(t); } @@ -438,13 +423,10 @@ public class PinotSegmentUploadDownloadRestletResource { // For the multipart endpoint, we will always move segment to final location regardless of the segment endpoint. public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart, @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, - @ApiParam(value = "Type of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) @DefaultValue("OFFLINE") String tableType, @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume( - uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, enableParallelPushProtection, - headers, request, true)); + asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, true)); } catch (Throwable t) { asyncResponse.resume(t); } @@ -462,13 +444,10 @@ public class PinotSegmentUploadDownloadRestletResource { // endpoint in how it moves the segment to a Pinot-determined final directory. public void uploadSegmentAsJsonV2(String segmentJsonStr, @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, - @ApiParam(value = "Type of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) @DefaultValue("OFFLINE") String tableType, @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume( - uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), null, enableParallelPushProtection, - headers, request, true)); + asyncResponse.resume(uploadSegment(tableName, null, enableParallelPushProtection, headers, request, true)); } catch (Throwable t) { asyncResponse.resume(t); } @@ -484,13 +463,10 @@ public class PinotSegmentUploadDownloadRestletResource { // This behavior does not differ from v1 of the same endpoint. public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart, @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, - @ApiParam(value = "Type of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) @DefaultValue("OFFLINE") String tableType, @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume( - uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, enableParallelPushProtection, - headers, request, true)); + asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, true)); } catch (Throwable t) { asyncResponse.resume(t); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index 0c6ad65..7b743c6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -57,36 +57,33 @@ public class ZKOperator { _controllerMetrics = controllerMetrics; } - public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata, + public void completeSegmentOperations(String rawTableName, SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter) throws Exception { + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); String segmentName = segmentMetadata.getName(); - ZNRecord segmentMetadataZnRecord = _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + + // Brand new segment, not refresh, directly add the segment + ZNRecord segmentMetadataZnRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, segmentName); if (segmentMetadataZnRecord == null) { - LOGGER.info("Adding new segment {} from table {}", segmentName, tableNameWithType); + LOGGER.info("Adding new segment {} from table {}", segmentName, rawTableName); processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, crypter, - tableNameWithType, segmentName, moveSegmentToFinalLocation); + rawTableName, segmentName, moveSegmentToFinalLocation); return; } - // TODO Allow segment refreshing for realtime tables. - if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { - throw new ControllerApplicationException(LOGGER, - "Refresh existing segment " + segmentName + " for realtime table " + tableNameWithType + " is not yet supported ", - Response.Status.NOT_IMPLEMENTED - ); - } + LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, rawTableName); - LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType); processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, - enableParallelPushProtection, headers, zkDownloadURI, crypter, tableNameWithType, segmentName, + enableParallelPushProtection, headers, zkDownloadURI, crypter, offlineTableName, segmentName, segmentMetadataZnRecord, moveSegmentToFinalLocation); } private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI, - String crypter, String tableNameWithType, String segmentName, ZNRecord znRecord, + String crypter, String offlineTableName, String segmentName, ZNRecord znRecord, boolean moveSegmentToFinalLocation) throws Exception { @@ -94,7 +91,7 @@ public class ZKOperator { long existingCrc = existingSegmentZKMetadata.getCrc(); // Check if CRC match when IF-MATCH header is set - checkCRC(headers, tableNameWithType, segmentName, existingCrc); + checkCRC(headers, offlineTableName, segmentName, existingCrc); // Check segment upload start time when parallel push protection enabled if (enableParallelPushProtection) { @@ -104,12 +101,12 @@ public class ZKOperator { if (System.currentTimeMillis() - segmentUploadStartTime > _controllerConf.getSegmentUploadTimeoutInMillis()) { // Last segment upload does not finish properly, replace the segment LOGGER - .error("Segment: {} of table: {} was not properly uploaded, replacing it", segmentName, tableNameWithType); + .error("Segment: {} of table: {} was not properly uploaded, replacing it", segmentName, offlineTableName); _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED, 1L); } else { // Another segment upload is in progress throw new ControllerApplicationException(LOGGER, - "Another segment upload is in progress for segment: " + segmentName + " of table: " + tableNameWithType + "Another segment upload is in progress for segment: " + segmentName + " of table: " + offlineTableName + ", retry later", Response.Status.CONFLICT); } } @@ -117,9 +114,9 @@ public class ZKOperator { // Lock the segment by setting the upload start time in ZK existingSegmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis()); if (!_pinotHelixResourceManager - .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, znRecord.getVersion())) { + .updateZkMetadata(offlineTableName, existingSegmentZKMetadata, znRecord.getVersion())) { throw new ControllerApplicationException(LOGGER, - "Failed to lock the segment: " + segmentName + " of table: " + tableNameWithType + ", retry later", + "Failed to lock the segment: " + segmentName + " of table: " + offlineTableName + ", retry later", Response.Status.CONFLICT); } } @@ -155,9 +152,9 @@ public class ZKOperator { // (creation time is not included in the crc) existingSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); existingSegmentZKMetadata.setRefreshTime(System.currentTimeMillis()); - if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, existingSegmentZKMetadata)) { + if (!_pinotHelixResourceManager.updateZkMetadata(offlineTableName, existingSegmentZKMetadata)) { throw new RuntimeException( - "Failed to update ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType); + "Failed to update ZK metadata for segment: " + segmentName + " of table: " + offlineTableName); } } else { // New segment is different with the existing one, update ZK metadata and refresh the segment @@ -169,16 +166,16 @@ public class ZKOperator { LOGGER.info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath()); } else { - LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, tableNameWithType, + LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, offlineTableName, zkDownloadURI); } _pinotHelixResourceManager - .refreshSegment(tableNameWithType, segmentMetadata, existingSegmentZKMetadata, zkDownloadURI, crypter); + .refreshSegment(offlineTableName, segmentMetadata, existingSegmentZKMetadata, zkDownloadURI, crypter); } } catch (Exception e) { - if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, existingSegmentZKMetadata)) { - LOGGER.error("Failed to update ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType); + if (!_pinotHelixResourceManager.updateZkMetadata(offlineTableName, existingSegmentZKMetadata)) { + LOGGER.error("Failed to update ZK metadata for segment: {} of table: {}", segmentName, offlineTableName); } throw e; } @@ -204,7 +201,7 @@ public class ZKOperator { } private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, - File currentSegmentLocation, String zkDownloadURI, String crypter, String tableNameWithType, String segmentName, + File currentSegmentLocation, String zkDownloadURI, String crypter, String rawTableName, String segmentName, boolean moveSegmentToFinalLocation) { // For v1 segment uploads, we will not move the segment if (moveSegmentToFinalLocation) { @@ -214,14 +211,14 @@ public class ZKOperator { .info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath()); } catch (Exception e) { - LOGGER.error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, e); + LOGGER.error("Could not move segment {} from table {} to permanent directory", segmentName, rawTableName, e); throw new RuntimeException(e); } } else { - LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, tableNameWithType, + LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, rawTableName, zkDownloadURI); } - _pinotHelixResourceManager.addNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter); + _pinotHelixResourceManager.addNewSegment(rawTableName, segmentMetadata, zkDownloadURI, crypter); } private void moveSegmentToPermanentDirectory(File currentSegmentLocation, URI finalSegmentLocationURI) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 8441d12..b1a9369 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -86,7 +86,6 @@ import org.apache.pinot.common.messages.SegmentReloadMessage; import org.apache.pinot.common.messages.TableConfigRefreshMessage; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; -import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import org.apache.pinot.common.utils.HashUtil; @@ -121,7 +120,6 @@ import org.apache.pinot.spi.config.table.TableCustomConfig; import org.apache.pinot.spi.config.table.TableStats; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TenantConfig; -import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.data.Schema; @@ -1625,110 +1623,72 @@ public class PinotHelixResourceManager { return instanceSet; } - public void addNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, String downloadUrl) { - addNewSegment(tableNameWithType, segmentMetadata, downloadUrl, null); + public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, String downloadUrl) { + addNewSegment(tableName, segmentMetadata, downloadUrl, null); } - public void addNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, String downloadUrl, + public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypter) { String segmentName = segmentMetadata.getName(); - InstancePartitionsType instancePartitionsType; + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName); + // NOTE: must first set the segment ZK metadata before assigning segment to instances because segment assignment // might need them to determine the partition of the segment, and server will need them to download the segment - ZNRecord znRecord; - if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { - Preconditions.checkState(isUpsertTable(tableNameWithType), - "Upload segment " + segmentName + " for non upsert enabled realtime table " + tableNameWithType - + " is not supported"); - // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server - // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType. - // TODO When upload segments is open to all realtime tables, we should change the type to COMPLETED instead. - // In addition, RealtimeSegmentAssignment.assignSegment(..) method should be updated so that the method does not - // assign segments to CONSUMING instance partition only. - instancePartitionsType = InstancePartitionsType.CONSUMING; - // Build the realtime segment zk metadata with necessary fields. - LLCRealtimeSegmentZKMetadata segmentZKMetadata = new LLCRealtimeSegmentZKMetadata(); - ZKMetadataUtils - .updateSegmentMetadata(segmentZKMetadata, segmentMetadata, CommonConstants.Segment.SegmentType.REALTIME); - segmentZKMetadata.setDownloadUrl(downloadUrl); - segmentZKMetadata.setCrypterName(crypter); - segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED); - znRecord = segmentZKMetadata.toZNRecord(); - } else { - instancePartitionsType = InstancePartitionsType.OFFLINE; - // Build the offline segment zk metadata with necessary fields. - OfflineSegmentZKMetadata segmentZKMetadata = new OfflineSegmentZKMetadata(); - ZKMetadataUtils - .updateSegmentMetadata(segmentZKMetadata, segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE); - segmentZKMetadata.setDownloadUrl(downloadUrl); - segmentZKMetadata.setCrypterName(crypter); - segmentZKMetadata.setPushTime(System.currentTimeMillis()); - znRecord = segmentZKMetadata.toZNRecord(); - } + OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata(); + ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata); + offlineSegmentZKMetadata.setDownloadUrl(downloadUrl); + offlineSegmentZKMetadata.setCrypterName(crypter); + offlineSegmentZKMetadata.setPushTime(System.currentTimeMillis()); String segmentZKMetadataPath = - ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, segmentName); + ZKMetadataProvider.constructPropertyStorePathForSegment(offlineTableName, segmentName); Preconditions.checkState( - _propertyStore.set(segmentZKMetadataPath, znRecord, AccessOption.PERSISTENT), - "Failed to set segment ZK metadata for table: " + tableNameWithType + ", segment: " + segmentName); - LOGGER.info("Added segment: {} of table: {} to property store", segmentName, tableNameWithType); - assignTableSegment(tableNameWithType, segmentName, segmentZKMetadataPath, instancePartitionsType); - } - + _propertyStore.set(segmentZKMetadataPath, offlineSegmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT), + "Failed to set segment ZK metadata for table: " + offlineTableName + ", segment: " + segmentName); + LOGGER.info("Added segment: {} of table: {} to property store", segmentName, offlineTableName); - private void assignTableSegment(String tableNameWithType, String segmentName, String segmentZKMetadataPath, - InstancePartitionsType instancePartitionsType) { // Assign instances for the segment and add it into IdealState try { - TableConfig tableConfig = getTableConfig(tableNameWithType); + TableConfig offlineTableConfig = getTableConfig(offlineTableName); Preconditions - .checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); + .checkState(offlineTableConfig != null, "Failed to find table config for table: " + offlineTableName); SegmentAssignment segmentAssignment = - SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig); + SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, offlineTableConfig); Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections - .singletonMap(instancePartitionsType, InstancePartitionsUtils - .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType)); - synchronized (getTableUpdaterLock(tableNameWithType)) { - HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { + .singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixZkManager, offlineTableConfig, InstancePartitionsType.OFFLINE)); + synchronized (getTableUpdaterLock(offlineTableName)) { + HelixHelper.updateIdealState(_helixZkManager, offlineTableName, idealState -> { assert idealState != null; Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); if (currentAssignment.containsKey(segmentName)) { LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, - tableNameWithType); + offlineTableName); } else { List<String> assignedInstances = segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap); LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, - tableNameWithType); + offlineTableName); currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); } return idealState; }); - LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, tableNameWithType); + LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, offlineTableName); } } catch (Exception e) { LOGGER .error("Caught exception while adding segment: {} to IdealState for table: {}, deleting segment ZK metadata", - segmentName, tableNameWithType, e); + segmentName, offlineTableName, e); if (_propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) { - LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType); + LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, offlineTableName); } else { LOGGER - .error("Failed to deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType); + .error("Failed to deleted segment ZK metadata for segment: {} of table: {}", segmentName, offlineTableName); } throw e; } } - public boolean isUpsertTable(String tableName) { - TableConfig realtimeTableConfig = getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName)); - if (realtimeTableConfig == null) { - return false; - } - UpsertConfig upsertConfig = realtimeTableConfig.getUpsertConfig(); - return ((upsertConfig != null) && upsertConfig.getMode() != UpsertConfig.Mode.NONE); - } - private Object getTableUpdaterLock(String offlineTableName) { return _tableUpdaterLocks[(offlineTableName.hashCode() & Integer.MAX_VALUE) % _tableUpdaterLocks.length]; } @@ -1757,7 +1717,7 @@ public class PinotHelixResourceManager { // ZK metadata to refresh the segment (server will compare the segment ZK metadata with the local metadata to decide // whether to download the new segment; broker will update the the segment partition info & time boundary based on // the segment ZK metadata) - ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE); + ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata); offlineSegmentZKMetadata.setRefreshTime(System.currentTimeMillis()); offlineSegmentZKMetadata.setDownloadUrl(downloadUrl); offlineSegmentZKMetadata.setCrypterName(crypter); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index 0e1974a..4f2ee84 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -32,9 +32,8 @@ import org.apache.commons.configuration.Configuration; import org.apache.helix.HelixManager; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.tier.Tier; -import org.apache.pinot.common.utils.SegmentUtils; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants; -import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; @@ -86,19 +85,15 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { private HelixManager _helixManager; private String _realtimeTableName; private int _replication; - private String _partitionColumn; @Override public void init(HelixManager helixManager, TableConfig tableConfig) { _helixManager = helixManager; _realtimeTableName = tableConfig.getTableName(); _replication = tableConfig.getValidationConfig().getReplicasPerPartitionNumber(); - ReplicaGroupStrategyConfig replicaGroupStrategyConfig = - tableConfig.getValidationConfig().getReplicaGroupStrategyConfig(); - _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null; - LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {}, partitionColumn: {} for table: {}", - _replication, _partitionColumn, _realtimeTableName); + LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {} for table: {}", _replication, + _realtimeTableName); } @Override @@ -141,8 +136,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { * Helper method to assign instances for CONSUMING segment based on the segment partition id and instance partitions. */ private List<String> assignConsumingSegment(String segmentName, InstancePartitions instancePartitions) { - int partitionGroupId = - SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn); + int partitionGroupId = new LLCSegmentName(segmentName).getPartitionGroupId(); int numReplicaGroups = instancePartitions.getNumReplicaGroups(); if (numReplicaGroups == 1) { @@ -180,8 +174,9 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { @Override public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, - Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, @Nullable List<Tier> sortedTiers, - @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, Configuration config) { + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, + @Nullable List<Tier> sortedTiers, @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, + Configuration config) { InstancePartitions completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED); InstancePartitions consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); @@ -303,7 +298,8 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { * Rebalances segments in the current assignment using the instancePartitions and returns new assignment */ private Map<String, Map<String, String>> reassignSegments(String instancePartitionType, - Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions, boolean bootstrap) { + Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions, + boolean bootstrap) { Map<String, Map<String, String>> newAssignment; if (bootstrap) { LOGGER.info("Bootstrapping segment assignment for {} segments of table: {}", instancePartitionType, @@ -320,8 +316,8 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { if (instancePartitions.getNumReplicaGroups() == 1) { // Non-replica-group based assignment - List<String> instances = - SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication); + List<String> instances = SegmentAssignmentUtils + .getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication); newAssignment = SegmentAssignmentUtils .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, _replication); } else { @@ -329,8 +325,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { Map<Integer, List<String>> partitionGroupIdToSegmentsMap = new HashMap<>(); for (String segmentName : currentAssignment.keySet()) { - int partitionGroupId = SegmentUtils - .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn); + int partitionGroupId = new LLCSegmentName(segmentName).getPartitionGroupId(); partitionGroupIdToSegmentsMap.computeIfAbsent(partitionGroupId, k -> new ArrayList<>()).add(segmentName); } @@ -343,7 +338,8 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { } newAssignment = SegmentAssignmentUtils - .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionGroupIdToSegmentsMap); + .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, + partitionGroupIdToSegmentsMap); } } return newAssignment; @@ -364,8 +360,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { // Replica-group based assignment // Uniformly spray the segment partitions over the instance partitions - int segmentPartitionId = - SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn); + int segmentPartitionId = new LLCSegmentName(segmentName).getPartitionGroupId(); int numPartitions = instancePartitions.getNumPartitions(); int partitionGroupId = segmentPartitionId % numPartitions; return SegmentAssignmentUtils diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java index 324ea00..058f574 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java @@ -21,8 +21,8 @@ package org.apache.pinot.controller.helix.core.util; import java.util.HashMap; import java.util.Map; import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; +import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; -import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; import org.apache.pinot.segment.local.segment.index.metadata.ColumnMetadata; import org.apache.pinot.segment.local.segment.index.metadata.SegmentMetadataImpl; @@ -35,24 +35,24 @@ public class ZKMetadataUtils { private ZKMetadataUtils() { } - public static void updateSegmentMetadata(SegmentZKMetadata segmentZKMetadata, SegmentMetadata segmentMetadata, - SegmentType segmentType) { - segmentZKMetadata.setSegmentName(segmentMetadata.getName()); - segmentZKMetadata.setTableName(segmentMetadata.getTableName()); - segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion()); - segmentZKMetadata.setSegmentType(segmentType); + public static void updateSegmentMetadata(OfflineSegmentZKMetadata offlineSegmentZKMetadata, + SegmentMetadata segmentMetadata) { + offlineSegmentZKMetadata.setSegmentName(segmentMetadata.getName()); + offlineSegmentZKMetadata.setTableName(segmentMetadata.getTableName()); + offlineSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion()); + offlineSegmentZKMetadata.setSegmentType(SegmentType.OFFLINE); if (segmentMetadata.getTimeInterval() != null) { - segmentZKMetadata.setStartTime(segmentMetadata.getStartTime()); - segmentZKMetadata.setEndTime(segmentMetadata.getEndTime()); - segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit()); + offlineSegmentZKMetadata.setStartTime(segmentMetadata.getStartTime()); + offlineSegmentZKMetadata.setEndTime(segmentMetadata.getEndTime()); + offlineSegmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit()); } - segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs()); - segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); - segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc())); + offlineSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs()); + offlineSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); + offlineSegmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc())); SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, - segmentZKMetadata.getCustomMap()); - segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap())); + offlineSegmentZKMetadata.getCustomMap()); + offlineSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap())); // Extract column partition metadata (if any), and set it into segment ZK metadata. Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>(); @@ -73,7 +73,7 @@ public class ZKMetadataUtils { } if (!columnPartitionMap.isEmpty()) { - segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(columnPartitionMap)); + offlineSegmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(columnPartitionMap)); } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java index 722e551..fac5c18 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java @@ -61,8 +61,7 @@ public class PinotSegmentRestletResourceTest { // Upload Segments for (int i = 0; i < 5; ++i) { SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME); - ControllerTestUtils.getHelixResourceManager() - .addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), segmentMetadata, "downloadUrl"); + ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME, segmentMetadata, "downloadUrl"); segmentMetadataTable.put(segmentMetadata.getName(), segmentMetadata); } @@ -72,8 +71,7 @@ public class PinotSegmentRestletResourceTest { // Add more segments for (int i = 0; i < 5; ++i) { SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME); - ControllerTestUtils.getHelixResourceManager() - .addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), segmentMetadata, "downloadUrl"); + ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME, segmentMetadata, "downloadUrl"); segmentMetadataTable.put(segmentMetadata.getName(), segmentMetadata); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java index 3c46bda..ba8898e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java @@ -54,7 +54,7 @@ public class TableViewsTest { new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(2).build(); Assert.assertEquals(ControllerTestUtils.getHelixManager().getInstanceType(), InstanceType.CONTROLLER); ControllerTestUtils.getHelixResourceManager().addTable(tableConfig); - ControllerTestUtils.getHelixResourceManager().addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME), + ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, OFFLINE_SEGMENT_NAME), "downloadUrl"); // Create the hybrid table diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java index 9f57faf..597c746 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java @@ -27,7 +27,6 @@ import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -41,7 +40,6 @@ import static org.testng.Assert.fail; public class ZKOperatorTest { private static final String TABLE_NAME = "operatorTestTable"; - private static final String TABLE_NAME_WITH_TYPE = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME); private static final String SEGMENT_NAME = "testSegment"; @BeforeClass @@ -61,7 +59,7 @@ public class ZKOperatorTest { when(segmentMetadata.getCrc()).thenReturn("12345"); when(segmentMetadata.getIndexCreationTime()).thenReturn(123L); HttpHeaders httpHeaders = mock(HttpHeaders.class); - zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE, segmentMetadata, null, null, false, httpHeaders, "downloadUrl", + zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null, null, false, httpHeaders, "downloadUrl", false, "crypter"); OfflineSegmentZKMetadata segmentZKMetadata = @@ -77,7 +75,7 @@ public class ZKOperatorTest { // Refresh the segment with unmatched IF_MATCH field when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123"); try { - zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE, segmentMetadata, null, null, false, httpHeaders, + zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null, null, false, httpHeaders, "otherDownloadUrl", false, null); fail(); } catch (Exception e) { @@ -88,7 +86,7 @@ public class ZKOperatorTest { // downloadURL and crypter when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345"); when(segmentMetadata.getIndexCreationTime()).thenReturn(456L); - zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE, segmentMetadata, null, null, false, httpHeaders, + zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null, null, false, httpHeaders, "otherDownloadUrl", false, "otherCrypter"); segmentZKMetadata = ControllerTestUtils .getHelixResourceManager().getOfflineSegmentZKMetadata(TABLE_NAME, SEGMENT_NAME); @@ -110,7 +108,7 @@ public class ZKOperatorTest { // 1 second delay to avoid "org.apache.helix.HelixException: Specified EXTERNALVIEW operatorTestTable_OFFLINE is // not found!" exception from being thrown sporadically. Thread.sleep(1000L); - zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE, segmentMetadata, null, null, false, httpHeaders, + zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null, null, false, httpHeaders, "otherDownloadUrl", false, "otherCrypter"); segmentZKMetadata = ControllerTestUtils .getHelixResourceManager().getOfflineSegmentZKMetadata(TABLE_NAME, SEGMENT_NAME); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java index 14523a3..726e2d0 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java @@ -70,7 +70,7 @@ public class ControllerInstanceToggleTest { // Add segments for (int i = 0; i < ControllerTestUtils.NUM_SERVER_INSTANCES; i++) { - ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME, + ControllerTestUtils.getHelixResourceManager().addNewSegment(RAW_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl"); Assert.assertEquals( ControllerTestUtils diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java index 929d651..616b491 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java @@ -26,7 +26,6 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.Assert; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeClass; @@ -65,9 +64,8 @@ public class ControllerSentinelTestV2 { Assert.assertEquals( ControllerTestUtils .getHelixAdmin().getResourceIdealState(ControllerTestUtils.getHelixClusterName(), TABLE_NAME + "_OFFLINE").getNumPartitions(), i); - ControllerTestUtils.getHelixResourceManager() - .addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), - SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), "downloadUrl"); + ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), + "downloadUrl"); Assert.assertEquals( ControllerTestUtils .getHelixAdmin().getResourceIdealState(ControllerTestUtils.getHelixClusterName(), TABLE_NAME + "_OFFLINE").getNumPartitions(), diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java index ea5326e..e1fc87b 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java @@ -18,23 +18,16 @@ */ package org.apache.pinot.controller.helix; -import java.util.HashMap; -import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; -import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerTestUtils; import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; -import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; -import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.config.table.UpsertConfig; -import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.Assert; @@ -44,30 +37,15 @@ import org.testng.annotations.Test; public class PinotResourceManagerTest { - private static final String OFFLINE_TABLE_NAME = "offlineResourceManagerTestTable_OFFLINE"; - private static final String REALTIME_TABLE_NAME = "realtimeResourceManagerTestTable_REALTIME"; - private static final String NUM_REPLICAS_STRING = "2"; - private static final String PARTITION_COLUMN = "Partition_Column"; + private static final String TABLE_NAME = "resourceManagerTestTable"; @BeforeClass public void setUp() throws Exception { ControllerTestUtils.setupClusterAndValidate(); - // Adding an offline table - TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).build(); - ControllerTestUtils.getHelixResourceManager().addTable(offlineTableConfig); - - // Adding an upsert enabled realtime table which consumes from a stream with 2 partitions - Schema dummySchema = ControllerTestUtils.createDummySchema(REALTIME_TABLE_NAME); - ControllerTestUtils.addSchema(dummySchema); - Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); - TableConfig realtimeTableConfig = new TableConfigBuilder(TableType.REALTIME).setStreamConfigs(streamConfigs). - setTableName(REALTIME_TABLE_NAME).setSchemaName(dummySchema.getSchemaName()).build(); - realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING); - realtimeTableConfig.getValidationConfig() - .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)); - realtimeTableConfig.setUpsertConfig(new UpsertConfig((UpsertConfig.Mode.FULL))); - ControllerTestUtils.getHelixResourceManager().addTable(realtimeTableConfig); + // Adding table + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + ControllerTestUtils.getHelixResourceManager().addTable(tableConfig); } @Test @@ -77,21 +55,21 @@ public class PinotResourceManagerTest { // Segment ZK metadata does not exist Assert.assertFalse( - ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0)); + ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0)); // Set segment ZK metadata Assert.assertTrue( - ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata)); + ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME + "_OFFLINE", segmentZKMetadata)); // Update ZK metadata Assert.assertEquals( - ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME + "_OFFLINE", "testSegment").getVersion(), 0); + ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(TABLE_NAME + "_OFFLINE", "testSegment").getVersion(), 0); Assert.assertTrue( - ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0)); + ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0)); Assert.assertEquals( - ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME + "_OFFLINE", "testSegment").getVersion(), 1); + ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(TABLE_NAME + "_OFFLINE", "testSegment").getVersion(), 1); Assert.assertFalse( - ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0)); + ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0)); } /** @@ -104,12 +82,11 @@ public class PinotResourceManagerTest { @Test public void testBasicAndConcurrentAddingAndDeletingSegments() throws Exception { - final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME); + final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME); // Basic add/delete case for (int i = 1; i <= 2; i++) { - ControllerTestUtils.getHelixResourceManager().addNewSegment( - OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), + ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), "downloadUrl"); } IdealState idealState = ControllerTestUtils @@ -131,8 +108,8 @@ public class PinotResourceManagerTest { @Override public void run() { for (int i = 0; i < 10; i++) { - ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), "downloadUrl"); + ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), "downloadUrl"); } } }); @@ -161,54 +138,6 @@ public class PinotResourceManagerTest { Assert.assertEquals(idealState.getPartitionSet().size(), 0); } - @Test - public void testAddingRealtimeTableSegmentsWithPartitionIdInZkMetadata() { - // Add three segments: two from partition 0 and 1 from partition 1; - String partition0Segment0 = "realtimeResourceManagerTestTable__aa"; - String partition0Segment1 = "realtimeResourceManagerTestTable__bb"; - String partition1Segment1 = "realtimeResourceManagerTestTable__cc"; - ControllerTestUtils.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils - .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition0Segment0, PARTITION_COLUMN, 0), - "downloadUrl"); - ControllerTestUtils.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils - .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition0Segment1, PARTITION_COLUMN, 0), - "downloadUrl"); - ControllerTestUtils.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils - .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition1Segment1, PARTITION_COLUMN, 1), - "downloadUrl"); - Map<String, Integer> segment2PartitionId = new HashMap<>(); - segment2PartitionId.put(partition0Segment0, 0); - segment2PartitionId.put(partition0Segment1, 0); - segment2PartitionId.put(partition1Segment1, 1); - - IdealState idealState = ControllerTestUtils.getHelixAdmin() - .getResourceIdealState(ControllerTestUtils.getHelixClusterName(), - TableNameBuilder.REALTIME.tableNameWithType(REALTIME_TABLE_NAME)); - Set<String> segments = idealState.getPartitionSet(); - Assert.assertEquals(segments.size(), 5); - Assert.assertTrue(segments.contains(partition0Segment0)); - Assert.assertTrue(segments.contains(partition0Segment1)); - Assert.assertTrue(segments.contains(partition1Segment1)); - - // Check the segments of the same partition is assigned to the same set of servers. - Map<Integer, Set<String>> segmentAssignment = new HashMap<>(); - for (String segment : segments) { - Integer partitionId; - if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) { - partitionId = new LLCSegmentName(segment).getPartitionGroupId(); - } else { - partitionId = segment2PartitionId.get(segment); - } - Assert.assertNotNull(partitionId); - Set<String> instances = idealState.getInstanceSet(segment); - if (segmentAssignment.containsKey(partitionId)) { - Assert.assertEquals(instances, segmentAssignment.get(partitionId)); - } else { - segmentAssignment.put(partitionId, instances); - } - } - } - @AfterClass public void tearDown() { ControllerTestUtils.cleanup(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 196633a..e2393e3 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -105,7 +105,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { // Add the segments int numSegments = 10; for (int i = 0; i < numSegments; i++) { - _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, + _helixResourceManager.addNewSegment(RAW_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, SEGMENT_NAME_PREFIX + i), null); } Map<String, Map<String, String>> oldSegmentAssignment = @@ -343,7 +343,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()); // keep decreasing end time from today in steps of 3. 3 segments don't move. 3 segment on tierA. 4 segments on tierB for (int i = 0; i < numSegments; i++) { - _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME, SegmentMetadataMockUtils + _helixResourceManager.addNewSegment(TIERED_TABLE_NAME, SegmentMetadataMockUtils .mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME, SEGMENT_NAME_PREFIX + i, nowInDays), null); nowInDays -= 3; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index cc7ecaf..6d52c03 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -72,7 +72,7 @@ public class RetentionManagerTest { for (int i = 0; i < numOlderSegments; ++i) { SegmentMetadata segmentMetadata = mockSegmentMetadata(pastTimeStamp, pastTimeStamp, timeUnit); OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata(); - ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE); + ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata); metadataList.add(offlineSegmentZKMetadata); removedSegments.add(offlineSegmentZKMetadata.getSegmentName()); } @@ -81,7 +81,7 @@ public class RetentionManagerTest { SegmentMetadata segmentMetadata = mockSegmentMetadata(dayAfterTomorrowTimeStamp, dayAfterTomorrowTimeStamp, timeUnit); OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata(); - ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE); + ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata); metadataList.add(offlineSegmentZKMetadata); } final TableConfig tableConfig = createOfflineTableConfig(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java index 95e400a..1e0e210 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java @@ -19,12 +19,9 @@ package org.apache.pinot.controller.utils; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; -import org.apache.pinot.segment.local.partition.MurmurPartitionFunction; import org.apache.pinot.segment.local.segment.index.metadata.ColumnMetadata; import org.apache.pinot.segment.local.segment.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.SegmentMetadata; @@ -78,7 +75,6 @@ public class SegmentMetadataMockUtils { ColumnMetadata columnMetadata = mock(ColumnMetadata.class); Set<Integer> partitions = Collections.singleton(partitionNumber); when(columnMetadata.getPartitions()).thenReturn(partitions); - when(columnMetadata.getPartitionFunction()).thenReturn(new MurmurPartitionFunction(5)); SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); if (columnName != null) { @@ -87,10 +83,6 @@ public class SegmentMetadataMockUtils { when(segmentMetadata.getTableName()).thenReturn(tableName); when(segmentMetadata.getName()).thenReturn(segmentName); when(segmentMetadata.getCrc()).thenReturn("0"); - - Map<String, ColumnMetadata> columnMetadataMap = new HashMap<>(); - columnMetadataMap.put(columnName, columnMetadata); - when(segmentMetadata.getColumnMetadataMap()).thenReturn(columnMetadataMap); return segmentMetadata; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java index 1296e38..86dbaf8 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java @@ -50,7 +50,7 @@ import static org.testng.Assert.assertEquals; */ public class ValidationManagerTest { private static final String TEST_TABLE_NAME = "validationTable"; - private static final String OFFLINE_TEST_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME); + private static final String TEST_TABLE_TWO = "validationTable2"; private static final String TEST_SEGMENT_NAME = "testSegment"; private TableConfig _offlineTableConfig; @@ -68,7 +68,7 @@ public class ValidationManagerTest { public void testPushTimePersistence() { SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME); - ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TEST_TABLE_NAME, segmentMetadata, "downloadUrl"); + ControllerTestUtils.getHelixResourceManager().addNewSegment(TEST_TABLE_NAME, segmentMetadata, "downloadUrl"); OfflineSegmentZKMetadata offlineSegmentZKMetadata = ControllerTestUtils.getHelixResourceManager().getOfflineSegmentZKMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME); long pushTime = offlineSegmentZKMetadata.getPushTime(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 5d4d43d..83beeb2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -45,7 +45,6 @@ import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.NamedThreadFactory; import org.apache.pinot.common.utils.SegmentName; -import org.apache.pinot.common.utils.SegmentUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.core.data.manager.BaseTableDataManager; @@ -263,24 +262,24 @@ public class RealtimeTableDataManager extends BaseTableDataManager { Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkNotNull(schema); - File segmentDir = new File(_indexDir, segmentName); + File indexDir = new File(_indexDir, segmentName); // Restart during segment reload might leave segment in inconsistent state (index directory might not exist but // segment backup directory existed), need to first try to recover from reload failure before checking the existence // of the index directory and loading segment from it - LoaderUtils.reloadFailureRecovery(segmentDir); + LoaderUtils.reloadFailureRecovery(indexDir); boolean isLLCSegment = SegmentName.isLowLevelConsumerSegmentName(segmentName); - if (segmentDir.exists()) { + if (indexDir.exists()) { // Segment already exists on disk if (realtimeSegmentZKMetadata.getStatus() == Status.DONE) { // Metadata has been committed, load the local segment try { - addSegment(ImmutableSegmentLoader.load(segmentDir, indexLoadingConfig, schema)); + addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema)); return; } catch (Exception e) { if (isLLCSegment) { // For LLC and segments, delete the local copy and download a new copy from the controller - FileUtils.deleteQuietly(segmentDir); + FileUtils.deleteQuietly(indexDir); if (e instanceof V3RemoveIndexException) { _logger.info("Unable to remove index from V3 format segment: {}, downloading a new copy", segmentName, e); } else { @@ -294,18 +293,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } } else { // Metadata has not been committed, delete the local segment - FileUtils.deleteQuietly(segmentDir); + FileUtils.deleteQuietly(indexDir); } - } else if (realtimeSegmentZKMetadata.getStatus() == Status.UPLOADED) { - // The segment is uploaded to an upsert enabled realtime table. Download the segment and load. - Preconditions.checkArgument(realtimeSegmentZKMetadata instanceof LLCRealtimeSegmentZKMetadata, - "Upload segment is not LLC segment"); - String downURL = ((LLCRealtimeSegmentZKMetadata)realtimeSegmentZKMetadata).getDownloadUrl(); - Preconditions.checkNotNull(downURL, "Upload segment metadata has no download url"); - downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, downURL); - _logger.info("Downloaded, untarred and add segment {} of table {} from {}", segmentName, tableConfig.getTableName(), - downURL); - return; } // Start a new consuming segment or download the segment from the controller @@ -361,8 +350,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { columnToReaderMap.put(_timeColumnName, new PinotSegmentColumnReader(immutableSegment, _timeColumnName)); int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs(); String segmentName = immutableSegment.getSegmentName(); - int partitionGroupId = SegmentUtils - .getRealtimeSegmentPartitionId(segmentName, this.getTableName(), _helixManager, _primaryKeyColumns.get(0)); + int partitionGroupId = new LLCSegmentName(immutableSegment.getSegmentName()).getPartitionGroupId(); PartitionUpsertMetadataManager partitionUpsertMetadataManager = _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId); int numPrimaryKeyColumns = _primaryKeyColumns.size(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 23efd0e..5486a43 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -38,15 +38,10 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.ZkStarter; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties; -import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.FieldConfig; -import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; -import org.apache.pinot.spi.config.table.RoutingConfig; -import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamConfig; @@ -365,26 +360,6 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { } /** - * Creates a new Upsert enabled table config. - */ - protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primaryKeyColumn, int numPartitions) { - AvroFileSchemaKafkaAvroMessageDecoder.avroFile = sampleAvroFile; - Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>(); - columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions)); - - return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName()) - .setTimeColumnName(getTimeColumnName()) - .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()) - .setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant()) - .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setLLC(useLlc()) - .setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()) - .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) - .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap)) - .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1)) - .setUpsertConfig(new UpsertConfig((UpsertConfig.Mode.FULL))).build(); - } - - /** * Returns the REALTIME table config in the cluster. */ protected TableConfig getRealtimeTableConfig() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java index 87eafb6..cc01ae6 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java @@ -182,9 +182,9 @@ public class BasicAuthRealtimeIntegrationTest extends BaseClusterIntegrationTest // download and sanity-check size of offline segment(s) for (int i = 0; i < offlineSegments.size(); i++) { String segment = offlineSegments.get(i).asText(); - Assert.assertTrue(sendGetRequest(_controllerRequestURLBuilder - .forSegmentDownload(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()), segment), AUTH_HEADER) - .length() > 200000); // download segment + Assert.assertTrue( + sendGetRequest(_controllerRequestURLBuilder.forSegmentDownload(getTableName(), segment), AUTH_HEADER).length() + > 200000); // download segment } } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java deleted file mode 100644 index 0633d52..0000000 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.integration.tests; - -import java.io.File; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import org.apache.helix.model.IdealState; -import org.apache.http.HttpStatus; -import org.apache.pinot.common.utils.FileUploadDownloadClient; -import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.common.utils.helix.HelixHelper; -import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.util.TestUtils; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - - -public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrationTestSet { - private static final int NUM_BROKERS = 1; - private static final int NUM_SERVERS = 2; - // Segment 1 contains records of pk value 100000 - private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %"; - // Segment 2 contains records of pk value 100001 - private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %"; - // Segment 3 contains records of pk value 100000 - private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %"; - private static final String PRIMARY_KEY_COL = "clientId"; - private static final String TABLE_NAME_WITH_TYPE = "mytable_REALTIME"; - - @BeforeClass - public void setUp() - throws Exception { - TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); - - // Start the Pinot cluster - startZk(); - // Start a customized controller with more frequent realtime segment validation - startController(); - startBrokers(getNumBrokers()); - startServers(NUM_SERVERS); - - // Start Kafka - startKafka(); - - // Create and upload the schema. - Schema schema = createSchema(); - addSchema(schema); - - // Unpack the Avro files - List<File> avroFiles = unpackAvroData(_tempDir); - - // Push data to Kafka - pushAvroIntoKafka(avroFiles); - // Create and upload the table config - TableConfig upsertTableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions()); - addTableConfig(upsertTableConfig); - - // Create and upload segments - ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, upsertTableConfig, schema, 0, _segmentDir, _tarDir); - uploadSegments(getTableName(), TableType.REALTIME, _tarDir); - - // Wait for all documents loaded - waitForAllDocsLoaded(600_000L); - } - - @Override - protected String getSchemaFileName() { - return "upsert_table_test.schema"; - } - - @Override - protected String getSchemaName() { - return "upsertSchema"; - } - - @Override - protected String getAvroTarFileName() { - return "upsert_test.tar.gz"; - } - - @Override - protected boolean useLlc() { - return true; - } - - protected int getNumBrokers() { - return NUM_BROKERS; - } - - @Override - protected long getCountStarResult() { - // Three distinct records are expected with pk values of 100000, 100001, 100002 - return 3; - } - - @Override - protected String getPartitionColumn() { - return PRIMARY_KEY_COL; - } - - @Override - protected void startController() { - Map<String, Object> controllerConfig = getDefaultControllerConfiguration(); - // Perform realtime segment validation every second with 1 second initial delay. - controllerConfig - .put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS, 1); - controllerConfig.put(ControllerConf.ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS, 1); - controllerConfig - .put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS, 1); - startController(controllerConfig); - } - - @Test - public void testSegmentAssignment() - throws Exception { - IdealState idealState = HelixHelper.getTableIdealState(_helixManager, TABLE_NAME_WITH_TYPE); - Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult()); - verifyTableIdealStates(idealState); - // Wait 3 seconds to let the realtime validation thread to run. - Thread.sleep(3000); - // Verify the result again. - Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult()); - verifyTableIdealStates(idealState); - } - - private void verifyTableIdealStates(IdealState idealState) { - // Verify various ideal state properties - Set<String> segments = idealState.getPartitionSet(); - Assert.assertEquals(segments.size(), 5); - Map<String, Integer> segment2PartitionId = new HashMap<>(); - segment2PartitionId.put(UPLOADED_SEGMENT_1, 0); - segment2PartitionId.put(UPLOADED_SEGMENT_2, 1); - segment2PartitionId.put(UPLOADED_SEGMENT_3, 1); - - // Verify that all segments of the same partition are mapped to the same single server. - Map<Integer, Set<String>> segmentAssignment = new HashMap<>(); - for (String segment : segments) { - Integer partitionId; - if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) { - partitionId = new LLCSegmentName(segment).getPartitionGroupId(); - } else { - partitionId = segment2PartitionId.get(segment); - } - Assert.assertNotNull(partitionId); - Set<String> instances = idealState.getInstanceSet(segment); - Assert.assertEquals(1, instances.size()); - if (segmentAssignment.containsKey(partitionId)) { - Assert.assertEquals(instances, segmentAssignment.get(partitionId)); - } else { - segmentAssignment.put(partitionId, instances); - } - } - } - - private void uploadSegments(String tableName, TableType tableType, File tarDir) - throws Exception { - File[] segmentTarFiles = tarDir.listFiles(); - assertNotNull(segmentTarFiles); - int numSegments = segmentTarFiles.length; - assertTrue(numSegments > 0); - - URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort); - try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { - if (numSegments == 1) { - File segmentTarFile = segmentTarFiles[0]; - assertEquals(fileUploadDownloadClient - .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName, tableType) - .getStatusCode(), HttpStatus.SC_OK); - } else { - // Upload all segments in parallel - ExecutorService executorService = Executors.newFixedThreadPool(numSegments); - List<Future<Integer>> futures = new ArrayList<>(numSegments); - for (File segmentTarFile : segmentTarFiles) { - futures.add(executorService.submit(() -> { - return fileUploadDownloadClient - .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName, tableType) - .getStatusCode(); - })); - } - executorService.shutdown(); - for (Future<Integer> future : futures) { - assertEquals((int) future.get(), HttpStatus.SC_OK); - } - } - } - } -} diff --git a/pinot-integration-tests/src/test/resources/upsert_table_test.schema b/pinot-integration-tests/src/test/resources/upsert_table_test.schema deleted file mode 100644 index 3f656f7..0000000 --- a/pinot-integration-tests/src/test/resources/upsert_table_test.schema +++ /dev/null @@ -1,33 +0,0 @@ -{ - "dimensionFieldSpecs": [ - { - "dataType": "INT", - "singleValueField": true, - "name": "clientId" - }, - { - "dataType": "STRING", - "singleValueField": true, - "name": "city" - }, - { - "dataType": "STRING", - "singleValueField": true, - "name": "description" - }, - { - "dataType": "INT", - "singleValueField": true, - "name": "salary" - } - ], - "timeFieldSpec": { - "incomingGranularitySpec": { - "timeType": "DAYS", - "dataType": "INT", - "name": "DaysSinceEpoch" - } - }, - "primaryKeyColumns": ["clientId"], - "schemaName": "upsertSchema" -} diff --git a/pinot-integration-tests/src/test/resources/upsert_test.tar.gz b/pinot-integration-tests/src/test/resources/upsert_test.tar.gz deleted file mode 100644 index 0bd0dc8..0000000 Binary files a/pinot-integration-tests/src/test/resources/upsert_test.tar.gz and /dev/null differ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java index 8e02f85..9289f9c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java @@ -118,8 +118,6 @@ public class PartitionUpsertMetadataManager { // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment). if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || ( recordInfo._timestamp == currentRecordLocation.getTimestamp() - && LLCSegmentName.isLowLevelConsumerSegmentName(segmentName) - && LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName()) && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName .getSequenceNumber(currentRecordLocation.getSegmentName()))) { currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 534a3a0..6a5d74a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -394,12 +394,7 @@ public class CommonConstants { public static class Segment { public static class Realtime { public enum Status { - // Means the segment is in CONSUMING state. - IN_PROGRESS, - // Means the segment is in ONLINE state (segment completed consuming and has been saved in segment store). - DONE, - // Means the segment is uploaded to a Pinot controller by an external party. - UPLOADED + IN_PROGRESS, DONE } /** diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java index 5afec65..5588a8c 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java @@ -329,9 +329,10 @@ public class PerfBenchmarkDriver { * * @param segmentMetadata segment metadata. */ - public void addSegment(String tableNameWithType, SegmentMetadata segmentMetadata) { + public void addSegment(String tableName, SegmentMetadata segmentMetadata) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); _helixResourceManager - .addNewSegment(tableNameWithType, segmentMetadata, "http://" + _controllerAddress + "/" + segmentMetadata.getName()); + .addNewSegment(rawTableName, segmentMetadata, "http://" + _controllerAddress + "/" + segmentMetadata.getName()); } public static void waitForExternalViewUpdate(String zkAddress, final String clusterName, long timeoutInMilliseconds) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java index 8f2796e..a785a92 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java @@ -62,7 +62,7 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command @Option(name = "-timeoutInSeconds", required = false, metaVar = "<int>", usage = "Timeout in seconds for batch load (default 60).") private int _timeoutInSeconds = 60; - @Option(name = "-tableNames", required = false, metaVar = "<String>", usage = "Comma separated table names with types to be loaded (non-batch load).") + @Option(name = "-tableNames", required = false, metaVar = "<String>", usage = "Comma separated table names to be loaded (non-batch load).") private String _tableNames; @Option(name = "-invertedIndexColumns", required = false, metaVar = "<String>", usage = "Comma separated inverted index columns to be created (non-batch load).") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org