This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e02bdda New Pinot byte metrics for compressed tar.gz and table size w/o replicas (#8358) e02bdda is described below commit e02bdda1501abb82e2efbc27674a74a6fb5519b5 Author: Tim Santos <t...@cortexdata.io> AuthorDate: Fri Mar 25 21:59:04 2022 +0100 New Pinot byte metrics for compressed tar.gz and table size w/o replicas (#8358) --- .../etc/jmx_prometheus_javaagent/configs/pinot.yml | 15 ++++ .../broker/broker/HelixBrokerStarterTest.java | 2 +- .../common/metadata/segment/SegmentZKMetadata.java | 8 ++ .../pinot/common/metrics/ControllerGauge.java | 10 +++ .../pinot/controller/BaseControllerStarter.java | 3 +- .../apache/pinot/controller/ControllerConf.java | 2 +- .../PinotSegmentUploadDownloadRestletResource.java | 21 +++-- .../pinot/controller/api/upload/ZKOperator.java | 20 +++-- .../controller/helix/SegmentStatusChecker.java | 35 +++++++- .../helix/core/PinotHelixResourceManager.java | 30 ++++++- .../pinot/controller/util/TableSizeReader.java | 39 +++++---- .../controller/validation/StorageQuotaChecker.java | 14 +--- .../pinot/controller/api/TableSizeReaderTest.java | 68 +++++++++++----- .../controller/api/upload/ZKOperatorTest.java | 16 ++-- .../controller/helix/SegmentStatusCheckerTest.java | 95 +++++++++++++++++++--- .../validation/StorageQuotaCheckerTest.java | 7 +- .../validation/ValidationManagerTest.java | 3 +- .../apache/pinot/spi/utils/CommonConstants.java | 1 + 18 files changed, 297 insertions(+), 92 deletions(-) diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml index dcaff91..e784180 100644 --- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml +++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml @@ -106,6 +106,21 @@ rules: cache: true labels: table: "$1" +- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableTotalSizeOnServer.(\\w+)_(\\w+)\"><>(\\w+)" + name: "pinot_controller_tableTotalSizeOnServer_$3" + labels: + table: "$1" + tableType: "$2" +- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableSizePerReplicaOnServer.(\\w+)_(\\w+)\"><>(\\w+)" + name: "pinot_controller_tableSizePerReplicaOnServer_$3" + labels: + table: "$1" + tableType: "$2" +- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableCompressedSize.(\\w+)_(\\w+)\"><>(\\w+)" + name: "pinot_controller_tableCompressedSize_$3" + labels: + table: "$1" + tableType: "$2" - pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableQuota.(\\w+)_(\\w+)\"><>(\\w+)" name: "pinot_controller_tableQuota_$3" cache: true diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index 3f7d59b..7f29c98 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -228,7 +228,7 @@ public class HelixBrokerStarterTest extends ControllerTest { _helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentToRefresh); _helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, segmentToRefresh, newEndTime), - segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null); + segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null, -1); TestUtils.waitForCondition(aVoid -> routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue() .equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update the time boundary for refreshed segment"); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java index 13264a1..0b73af4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java @@ -125,6 +125,14 @@ public class SegmentZKMetadata implements ZKMetadata { setNonNegativeValue(Segment.TOTAL_DOCS, totalDocs); } + public void setSizeInBytes(long sizeInBytes) { + setNonNegativeValue(Segment.SIZE_IN_BYTES, sizeInBytes); + } + + public long getSizeInBytes() { + return _znRecord.getLongField(Segment.SIZE_IN_BYTES, -1); + } + public long getCrc() { return _znRecord.getLongField(Segment.CRC, -1); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index ae33acc..7e908ff 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -67,8 +67,18 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { CONTROLLER_LEADER_PARTITION_COUNT("ControllerLeaderPartitionCount", true), // Estimated size of offline table + @Deprecated // Instead use TABLE_TOTAL_SIZE_ON_SERVER OFFLINE_TABLE_ESTIMATED_SIZE("OfflineTableEstimatedSize", false), + // Total size of table across replicas on servers + TABLE_TOTAL_SIZE_ON_SERVER("TableTotalSizeOnServer", false), + + // Size of table per replica on servers + TABLE_SIZE_PER_REPLICA_ON_SERVER("TableSizePerReplicaOnServer", false), + + // Total size of compressed segments per table + TABLE_COMPRESSED_SIZE("TableCompressedSize", false), + // Table quota based on setting in table config TABLE_QUOTA("TableQuotaBasedOnTableConfig", false), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 6a3e9a5..94e68c7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -646,7 +646,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics); periodicTasks.add(_brokerResourceValidationManager); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); periodicTasks.add(_segmentStatusChecker); _segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index cbda36a..25514ab 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -251,7 +251,7 @@ public class ControllerConf extends PinotConfiguration { private static final String DEFAULT_DIM_TABLE_MAX_SIZE = "200M"; private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName(); - public static final String DISABLE_GROOVY = "controller.disable.ingestion.groovy"; + private static final String DISABLE_GROOVY = "controller.disable.ingestion.groovy"; public ControllerConf() { super(new HashMap<>()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 253a3fb..dfd3b54 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -214,17 +214,28 @@ public class PinotSegmentUploadDownloadRestletResource { boolean uploadedSegmentIsEncrypted = !Strings.isNullOrEmpty(crypterClassNameInHeader); FileUploadDownloadClient.FileUploadType uploadType = getUploadType(uploadTypeStr); File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile; + long segmentSizeInBytes; switch (uploadType) { case URI: downloadSegmentFileFromURI(downloadUri, dstFile, tableName); + segmentSizeInBytes = dstFile.length(); break; case SEGMENT: createSegmentFileFromMultipart(multiPart, dstFile); + segmentSizeInBytes = dstFile.length(); break; case METADATA: moveSegmentToFinalLocation = false; Preconditions.checkState(downloadUri != null, "Download URI is required in segment metadata upload mode"); createSegmentFileFromMultipart(multiPart, dstFile); + try { + URI segmentURI = new URI(downloadUri); + PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme()); + segmentSizeInBytes = pinotFS.length(segmentURI); + } catch (Exception e) { + segmentSizeInBytes = -1; + LOGGER.warn("Could not fetch segment size for metadata push", e); + } break; default: throw new UnsupportedOperationException("Unsupported upload type: " + uploadType); @@ -302,7 +313,7 @@ public class PinotSegmentUploadDownloadRestletResource { // Zk operations completeZkOperations(enableParallelPushProtection, headers, finalSegmentFile, tableNameWithType, segmentMetadata, - segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName, allowRefresh); + segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName, allowRefresh, segmentSizeInBytes); return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType); } catch (WebApplicationException e) { @@ -397,15 +408,15 @@ public class PinotSegmentUploadDownloadRestletResource { private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File uploadedSegmentFile, String tableNameWithType, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI, - boolean moveSegmentToFinalLocation, String crypter, boolean allowRefresh) + boolean moveSegmentToFinalLocation, String crypter, boolean allowRefresh, long segmentSizeInBytes) throws Exception { String basePath = ControllerFilePathProvider.getInstance().getDataDirURI().toString(); String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); URI finalSegmentLocationURI = URIUtils.getUri(basePath, rawTableName, URIUtils.encode(segmentName)); ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); - zkOperator - .completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, uploadedSegmentFile, - enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter, allowRefresh); + zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, + uploadedSegmentFile, enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter, + allowRefresh, segmentSizeInBytes); } private void decryptFile(String crypterClassName, File tempEncryptedFile, File tempDecryptedFile) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index 693d42f..28f1753 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -61,7 +61,7 @@ public class ZKOperator { public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter, - boolean allowRefresh) + boolean allowRefresh, long segmentSizeInBytes) throws Exception { String segmentName = segmentMetadata.getName(); ZNRecord segmentMetadataZNRecord = @@ -76,7 +76,8 @@ public class ZKOperator { } LOGGER.info("Adding new segment {} from table {}", segmentName, tableNameWithType); processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, headers, - crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, enableParallelPushProtection); + crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, enableParallelPushProtection, + segmentSizeInBytes); return; } @@ -101,13 +102,13 @@ public class ZKOperator { LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType); processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, enableParallelPushProtection, headers, zkDownloadURI, crypter, tableNameWithType, segmentName, - segmentMetadataZNRecord, moveSegmentToFinalLocation); + segmentMetadataZNRecord, moveSegmentToFinalLocation, segmentSizeInBytes); } private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI, String crypter, String tableNameWithType, String segmentName, ZNRecord znRecord, - boolean moveSegmentToFinalLocation) + boolean moveSegmentToFinalLocation, long segmentSizeInBytes) throws Exception { SegmentZKMetadata existingSegmentZKMetadata = new SegmentZKMetadata(znRecord); @@ -202,7 +203,7 @@ public class ZKOperator { _pinotHelixResourceManager .refreshSegment(tableNameWithType, segmentMetadata, existingSegmentZKMetadata, expectedVersion, - zkDownloadURI, crypter); + zkDownloadURI, crypter, segmentSizeInBytes); } } catch (Exception e) { if (!_pinotHelixResourceManager @@ -234,10 +235,12 @@ public class ZKOperator { private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType, - String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection) + String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection, + long segmentSizeInBytes) throws Exception { - SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager - .constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter); + SegmentZKMetadata newSegmentZKMetadata = + _pinotHelixResourceManager.constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, + crypter, segmentSizeInBytes); // Lock if enableParallelPushProtection is true. if (enableParallelPushProtection) { @@ -253,7 +256,6 @@ public class ZKOperator { newSegmentZKMetadata .setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(newSegmentZKMetadata.getCustomMap())); } - if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, newSegmentZKMetadata)) { throw new RuntimeException( "Failed to create ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 3f2e378..0e69829 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -18,14 +18,18 @@ */ package org.apache.pinot.controller.helix; +import com.google.common.annotations.VisibleForTesting; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.commons.httpclient.SimpleHttpConnectionManager; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.lineage.SegmentLineage; import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; import org.apache.pinot.common.lineage.SegmentLineageUtils; @@ -36,6 +40,7 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; +import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; @@ -57,21 +62,28 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh private static final long DISABLED_TABLE_LOG_INTERVAL_MS = TimeUnit.DAYS.toMillis(1); private static final ZNRecordSerializer RECORD_SERIALIZER = new ZNRecordSerializer(); + private static final int TABLE_CHECKER_TIMEOUT_MS = 30_000; + private final int _waitForPushTimeSeconds; private long _lastDisabledTableLogTimestamp = 0; + private TableSizeReader _tableSizeReader; + /** * Constructs the segment status checker. * @param pinotHelixResourceManager The resource checker used to interact with Helix * @param config The controller configuration object */ public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, - LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) { + LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics, + ExecutorService executorService) { super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(), config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics); _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds(); + _tableSizeReader = new TableSizeReader(executorService, new SimpleHttpConnectionManager(true), _controllerMetrics, + _pinotHelixResourceManager); } @Override @@ -96,6 +108,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh protected void processTable(String tableNameWithType, Context context) { try { updateSegmentMetrics(tableNameWithType, context); + updateTableSizeMetrics(tableNameWithType); } catch (Exception e) { LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e); // Remove the metric for this table @@ -110,6 +123,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, context._disabledTableCount); } + private void updateTableSizeMetrics(String tableNameWithType) + throws InvalidConfigException { + _tableSizeReader.getTableSizeDetails(tableNameWithType, TABLE_CHECKER_TIMEOUT_MS); + } + /** * Runs a segment status pass over the given table. * TODO: revisit the logic and reduce the ZK access @@ -173,6 +191,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh int nErrors = 0; // Keeps track of number of segments in error state int nOffline = 0; // Keeps track of number segments with no online replicas int nSegments = 0; // Counts number of segments + long tableCompressedSize = 0; // Tracks the total compressed segment size in deep store per table for (String partitionName : segmentsExcludeReplaced) { int nReplicas = 0; int nIdeal = 0; @@ -198,6 +217,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh // Push is not finished yet, skip the segment continue; } + if (segmentZKMetadata != null) { + long sizeInBytes = segmentZKMetadata.getSizeInBytes(); + if (sizeInBytes > 0) { + tableCompressedSize += sizeInBytes; + } + } nReplicasIdealMax = (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState .getInstanceStateMap(partitionName).size() : nReplicasIdealMax; if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) { @@ -240,6 +265,9 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE, + tableCompressedSize); + if (nOffline > 0) { LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline); } @@ -287,6 +315,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh setStatusToDefault(); } + @VisibleForTesting + void setTableSizeReader(TableSizeReader tableSizeReader) { + _tableSizeReader = tableSizeReader; + } + public static final class Context { private boolean _logDisabledTables; private int _realTimeTableCount; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index cfc83d4..ecbd3b7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -116,6 +116,7 @@ import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils; import org.apache.pinot.controller.helix.starter.HelixConfig; import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.segment.local.utils.ReplicationUtils; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.ConfigUtils; import org.apache.pinot.spi.config.instance.Instance; @@ -1910,7 +1911,7 @@ public class PinotHelixResourceManager { // NOTE: must first set the segment ZK metadata before assigning segment to instances because segment assignment // might need them to determine the partition of the segment, and server will need them to download the segment SegmentZKMetadata segmentZkmetadata = - constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, downloadUrl, crypter); + constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, downloadUrl, crypter, -1); ZNRecord znRecord = segmentZkmetadata.toZNRecord(); String segmentName = segmentMetadata.getName(); @@ -1930,16 +1931,18 @@ public class PinotHelixResourceManager { * @param segmentMetadata Segment metadata * @param downloadUrl Download URL * @param crypter Crypter + * @param segmentSizeInBytes Size of segment in bytes. * @return SegmentZkMetadata of the input segment */ public SegmentZKMetadata constructZkMetadataForNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, - String downloadUrl, @Nullable String crypter) { + String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) { // Construct segment zk metadata with common fields for offline and realtime. String segmentName = segmentMetadata.getName(); SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName); ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata); segmentZKMetadata.setDownloadUrl(downloadUrl); segmentZKMetadata.setCrypterName(crypter); + segmentZKMetadata.setSizeInBytes(segmentSizeInBytes); if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { Preconditions.checkState(isUpsertTable(tableNameWithType), @@ -2051,7 +2054,8 @@ public class PinotHelixResourceManager { } public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMetadata, - SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter) { + SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter, + long segmentSizeInBytes) { String segmentName = segmentMetadata.getName(); // NOTE: Must first set the segment ZK metadata before trying to refresh because servers and brokers rely on segment @@ -2062,6 +2066,7 @@ public class PinotHelixResourceManager { segmentZKMetadata.setRefreshTime(System.currentTimeMillis()); segmentZKMetadata.setDownloadUrl(downloadUrl); segmentZKMetadata.setCrypterName(crypter); + segmentZKMetadata.setSizeInBytes(segmentSizeInBytes); if (!ZKMetadataProvider .setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata, expectedVersion)) { throw new RuntimeException( @@ -3363,6 +3368,25 @@ public class PinotHelixResourceManager { return hosts; } + /** + * Returns the number of replicas for a given table config + */ + public int getNumReplicas(TableConfig tableConfig) { + if (tableConfig.isDimTable()) { + // If the table is a dimension table then fetch the tenant config and get the number of server belonging + // to the tenant + TenantConfig tenantConfig = tableConfig.getTenantConfig(); + Set<String> serverInstances = getAllInstancesForServerTenant(tenantConfig.getServer()); + return serverInstances.size(); + } + + if (ReplicationUtils.useReplicasPerPartition(tableConfig)) { + return Integer.parseInt(tableConfig.getValidationConfig().getReplicasPerPartition()); + } + + return tableConfig.getValidationConfig().getReplicationNumber(); + } + /* * Uncomment and use for testing on a real cluster public static void main(String[] args) throws Exception { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java index 6c2556c..4cbc703 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java @@ -32,12 +32,13 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.restlet.resources.SegmentSizeInfo; import org.apache.pinot.controller.api.resources.ServerTableSizeReader; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,36 +79,40 @@ public class TableSizeReader { Preconditions.checkNotNull(tableName, "Table name should not be null"); Preconditions.checkArgument(timeoutMsec > 0, "Timeout value must be greater than 0"); - boolean hasRealtimeTable = false; - boolean hasOfflineTable = false; - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + TableConfig offlineTableConfig = + ZKMetadataProvider.getOfflineTableConfig(_helixResourceManager.getPropertyStore(), tableName); + TableConfig realtimeTableConfig = + ZKMetadataProvider.getRealtimeTableConfig(_helixResourceManager.getPropertyStore(), tableName); - if (tableType != null) { - hasRealtimeTable = tableType == TableType.REALTIME; - hasOfflineTable = tableType == TableType.OFFLINE; - } else { - hasRealtimeTable = _helixResourceManager.hasRealtimeTable(tableName); - hasOfflineTable = _helixResourceManager.hasOfflineTable(tableName); - } - - if (!hasOfflineTable && !hasRealtimeTable) { + if (offlineTableConfig == null && realtimeTableConfig == null) { return null; } - TableSizeDetails tableSizeDetails = new TableSizeDetails(tableName); - - if (hasRealtimeTable) { + if (realtimeTableConfig != null) { String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); tableSizeDetails._realtimeSegments = getTableSubtypeSize(realtimeTableName, timeoutMsec); tableSizeDetails._reportedSizeInBytes += tableSizeDetails._realtimeSegments._reportedSizeInBytes; tableSizeDetails._estimatedSizeInBytes += tableSizeDetails._realtimeSegments._estimatedSizeInBytes; + + _controllerMetrics.setValueOfTableGauge(realtimeTableName, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER, + tableSizeDetails._realtimeSegments._estimatedSizeInBytes); + _controllerMetrics.setValueOfTableGauge(realtimeTableName, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER, + tableSizeDetails._realtimeSegments._estimatedSizeInBytes / _helixResourceManager.getNumReplicas( + realtimeTableConfig)); } - if (hasOfflineTable) { + if (offlineTableConfig != null) { String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName); tableSizeDetails._offlineSegments = getTableSubtypeSize(offlineTableName, timeoutMsec); tableSizeDetails._reportedSizeInBytes += tableSizeDetails._offlineSegments._reportedSizeInBytes; tableSizeDetails._estimatedSizeInBytes += tableSizeDetails._offlineSegments._estimatedSizeInBytes; + + _controllerMetrics.setValueOfTableGauge(offlineTableName, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER, + tableSizeDetails._offlineSegments._estimatedSizeInBytes); + _controllerMetrics.setValueOfTableGauge(offlineTableName, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER, + tableSizeDetails._offlineSegments._estimatedSizeInBytes / _helixResourceManager.getNumReplicas( + offlineTableConfig)); } + return tableSizeDetails; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java index ee732b0..4920cf5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java @@ -19,7 +19,6 @@ package org.apache.pinot.controller.validation; import com.google.common.base.Preconditions; -import java.util.Set; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -27,7 +26,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TenantConfig; import org.apache.pinot.spi.utils.DataSizeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,17 +84,7 @@ public class StorageQuotaChecker { // 3. update predicted segment sizes // 4. is the updated size within quota QuotaConfig quotaConfig = _tableConfig.getQuotaConfig(); - int numReplicas; - - if (_tableConfig.isDimTable()) { - // If the table is a dimension table then fetch the tenant config and get the number of server belonging - // to the tenant - TenantConfig tenantConfig = _tableConfig.getTenantConfig(); - Set<String> serverInstances = _pinotHelixResourceManager.getAllInstancesForServerTenant(tenantConfig.getServer()); - numReplicas = serverInstances.size(); - } else { - numReplicas = _tableConfig.getValidationConfig().getReplicationNumber(); - } + int numReplicas = _pinotHelixResourceManager.getNumReplicas(_tableConfig); final String tableNameWithType = _tableConfig.getTableName(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java index f83fbe6..04dbb51 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java @@ -33,16 +33,22 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; +import org.apache.helix.AccessOption; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.PinotMetricUtils; import org.apache.pinot.common.restlet.resources.SegmentSizeInfo; import org.apache.pinot.common.restlet.resources.TableSizeInfo; +import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.controller.utils.FakeHttpServer; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.mockito.ArgumentMatchers; import org.mockito.invocation.InvocationOnMock; @@ -64,6 +70,7 @@ public class TableSizeReaderTest { private static final String URI_PATH = "/table/"; private static final int TIMEOUT_MSEC = 10000; private static final int EXTENDED_TIMEOUT_FACTOR = 100; + private static final int NUM_REPLICAS = 2; private final Executor _executor = Executors.newFixedThreadPool(1); private final HttpConnectionManager _connectionManager = new MultiThreadedHttpConnectionManager(); @@ -76,23 +83,25 @@ public class TableSizeReaderTest { public void setUp() throws IOException { _helix = mock(PinotHelixResourceManager.class); - when(_helix.hasOfflineTable(anyString())).thenAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) - throws Throwable { - String table = (String) invocationOnMock.getArguments()[0]; - return table.indexOf("offline") >= 0; - } - }); - when(_helix.hasRealtimeTable(anyString())).thenAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) - throws Throwable { - String table = (String) invocationOnMock.getArguments()[0]; - return table.indexOf("realtime") >= 0; - } - }); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setNumReplicas(NUM_REPLICAS).build(); + ZkHelixPropertyStore mockPropertyStore = mock(ZkHelixPropertyStore.class); + + when(mockPropertyStore.get(ArgumentMatchers.anyString(), ArgumentMatchers.eq(null), + ArgumentMatchers.eq(AccessOption.PERSISTENT))).thenAnswer((Answer) invocationOnMock -> { + String path = (String) invocationOnMock.getArguments()[0]; + if (path.contains("realtime_REALTIME")) { + return TableConfigUtils.toZNRecord(tableConfig); + } + if (path.contains("offline_OFFLINE")) { + return TableConfigUtils.toZNRecord(tableConfig); + } + return null; + }); + + when(_helix.getPropertyStore()).thenReturn(mockPropertyStore); + when(_helix.getNumReplicas(ArgumentMatchers.eq(tableConfig))).thenReturn(NUM_REPLICAS); int counter = 0; // server0 @@ -313,6 +322,10 @@ public class TableSizeReaderTest { String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table); Assert.assertEquals(_controllerMetrics .getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 0); + Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, + ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), offlineSizes._estimatedSizeInBytes / NUM_REPLICAS); + Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, + ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), offlineSizes._estimatedSizeInBytes); } @Test @@ -330,6 +343,10 @@ public class TableSizeReaderTest { String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table); Assert.assertEquals(_controllerMetrics .getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 100); + Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, + ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), offlineSizes._estimatedSizeInBytes / NUM_REPLICAS); + Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, + ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), offlineSizes._estimatedSizeInBytes); } @Test @@ -345,19 +362,32 @@ public class TableSizeReaderTest { validateTableSubTypeSize(servers, offlineSizes); Assert.assertNull(tableSizeDetails._realtimeSegments); String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table); - Assert.assertEquals(_controllerMetrics - .getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 20); + Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, + ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 20); + Assert.assertEquals( + _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), + offlineSizes._estimatedSizeInBytes / NUM_REPLICAS); + Assert.assertEquals( + _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), + offlineSizes._estimatedSizeInBytes); } @Test public void getTableSizeDetailsRealtimeOnly() throws InvalidConfigException { final String[] servers = {"server3", "server4"}; - TableSizeReader.TableSizeDetails tableSizeDetails = testRunner(servers, "realtime"); + String table = "realtime"; + TableSizeReader.TableSizeDetails tableSizeDetails = testRunner(servers, table); Assert.assertNull(tableSizeDetails._offlineSegments); TableSizeReader.TableSubTypeSizeDetails realtimeSegments = tableSizeDetails._realtimeSegments; Assert.assertEquals(realtimeSegments._segments.size(), 2); Assert.assertTrue(realtimeSegments._reportedSizeInBytes == realtimeSegments._estimatedSizeInBytes); validateTableSubTypeSize(servers, realtimeSegments); + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(table); + Assert.assertEquals( + _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), + realtimeSegments._estimatedSizeInBytes / NUM_REPLICAS); + Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, + ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), realtimeSegments._estimatedSizeInBytes); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java index b701258..c9a119b 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java @@ -78,7 +78,7 @@ public class ZKOperatorTest { zkOperator .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, true, httpHeaders, "downloadUrl", - true, "crypter", true); + true, "crypter", true, 10); fail(); } catch (Exception e) { // Expected @@ -91,10 +91,9 @@ public class ZKOperatorTest { return segmentZKMetadata == null; }, 30_000L, "Failed to delete segmentZkMetadata."); - zkOperator .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, "downloadUrl", - false, "crypter", true); + false, "crypter", true, 10); SegmentZKMetadata segmentZKMetadata = ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); @@ -107,11 +106,12 @@ public class ZKOperatorTest { assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl"); assertEquals(segmentZKMetadata.getCrypterName(), "crypter"); assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1); + assertEquals(segmentZKMetadata.getSizeInBytes(), 10); // Upload the same segment with allowRefresh = false. Validate that an exception is thrown. try { zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, false, httpHeaders, - "otherDownloadUrl", false, "otherCrypter", false); + "otherDownloadUrl", false, "otherCrypter", false, 10); fail(); } catch (Exception e) { // Expected @@ -121,7 +121,7 @@ public class ZKOperatorTest { when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123"); try { zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, - "otherDownloadUrl", false, null, true); + "otherDownloadUrl", false, null, true, 10); fail(); } catch (Exception e) { // Expected @@ -132,7 +132,7 @@ public class ZKOperatorTest { when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345"); when(segmentMetadata.getIndexCreationTime()).thenReturn(456L); zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, - "otherDownloadUrl", false, "otherCrypter", true); + "otherDownloadUrl", false, "otherCrypter", true, 10); segmentZKMetadata = ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); assertEquals(segmentZKMetadata.getCrc(), 12345L); @@ -146,6 +146,7 @@ public class ZKOperatorTest { assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl"); assertEquals(segmentZKMetadata.getCrypterName(), "crypter"); assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1); + assertEquals(segmentZKMetadata.getSizeInBytes(), 10); // Refresh the segment with a different segment (different CRC) when(segmentMetadata.getCrc()).thenReturn("23456"); @@ -155,7 +156,7 @@ public class ZKOperatorTest { // not found!" exception from being thrown sporadically. Thread.sleep(1000L); zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, - "otherDownloadUrl", false, "otherCrypter", true); + "otherDownloadUrl", false, "otherCrypter", true, 10); segmentZKMetadata = ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); assertEquals(segmentZKMetadata.getCrc(), 23456L); @@ -166,6 +167,7 @@ public class ZKOperatorTest { assertTrue(segmentZKMetadata.getRefreshTime() > refreshTime); assertEquals(segmentZKMetadata.getDownloadUrl(), "otherDownloadUrl"); assertEquals(segmentZKMetadata.getCrypterName(), "otherCrypter"); + assertEquals(segmentZKMetadata.getSizeInBytes(), 10); } @AfterClass diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 2ff7cec..dbcc051 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -22,6 +22,8 @@ import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.helix.AccessOption; import org.apache.helix.ZNRecord; @@ -40,6 +42,7 @@ import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -47,6 +50,7 @@ import org.testng.Assert; import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -61,6 +65,8 @@ public class SegmentStatusCheckerTest { private PinotMetricsRegistry _metricsRegistry; private ControllerMetrics _controllerMetrics; private ControllerConf _config; + private TableSizeReader _tableSizeReader; + private ExecutorService _executorService = Executors.newFixedThreadPool(1); @Test public void offlineBasicTest() @@ -126,10 +132,16 @@ public class SegmentStatusCheckerTest { _leadControllerManager = mock(LeadControllerManager.class); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } + { + _tableSizeReader = mock(TableSizeReader.class); + when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); + } _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); + _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); Assert @@ -148,6 +160,8 @@ public class SegmentStatusCheckerTest { 66); Assert.assertEquals( _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + Assert.assertEquals( + _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test @@ -201,10 +215,16 @@ public class SegmentStatusCheckerTest { _leadControllerManager = mock(LeadControllerManager.class); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } + { + _tableSizeReader = mock(TableSizeReader.class); + when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); + } _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); + _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); Assert.assertEquals( @@ -221,7 +241,8 @@ public class SegmentStatusCheckerTest { } @Test - public void missingEVPartitionTest() { + public void missingEVPartitionTest() + throws Exception { String offlineTableName = "myTable_OFFLINE"; List<String> allTableNames = new ArrayList<String>(); allTableNames.add(offlineTableName); @@ -254,6 +275,7 @@ public class SegmentStatusCheckerTest { znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_0"); znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis()); znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis()); + znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111); ZkHelixPropertyStore<ZNRecord> propertyStore; { @@ -281,10 +303,16 @@ public class SegmentStatusCheckerTest { _leadControllerManager = mock(LeadControllerManager.class); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } + { + _tableSizeReader = mock(TableSizeReader.class); + when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); + } _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); + _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); Assert.assertEquals( @@ -294,6 +322,8 @@ public class SegmentStatusCheckerTest { 0); Assert.assertEquals( _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75); + Assert.assertEquals( + _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 1111); } @Test @@ -330,14 +360,22 @@ public class SegmentStatusCheckerTest { _leadControllerManager = mock(LeadControllerManager.class); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } + { + _tableSizeReader = mock(TableSizeReader.class); + when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); + } _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); + _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS), 0); + Assert.assertEquals( + _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test @@ -362,10 +400,16 @@ public class SegmentStatusCheckerTest { _leadControllerManager = mock(LeadControllerManager.class); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } + { + _tableSizeReader = mock(TableSizeReader.class); + when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); + } _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); + _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), @@ -374,10 +418,13 @@ public class SegmentStatusCheckerTest { Long.MIN_VALUE); Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS), Long.MIN_VALUE); + Assert.assertEquals( + _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test - public void missingEVPartitionPushTest() { + public void missingEVPartitionPushTest() + throws Exception { String offlineTableName = "myTable_OFFLINE"; List<String> allTableNames = new ArrayList<String>(); allTableNames.add(offlineTableName); @@ -408,6 +455,7 @@ public class SegmentStatusCheckerTest { znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_0"); znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis()); znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis()); + znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111); ZNRecord znrecord2 = new ZNRecord("myTable_2"); znrecord2.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1"); @@ -420,6 +468,7 @@ public class SegmentStatusCheckerTest { znrecord2.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_2"); znrecord2.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis()); znrecord2.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis()); + znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111); { _helixResourceManager = mock(PinotHelixResourceManager.class); @@ -442,10 +491,16 @@ public class SegmentStatusCheckerTest { _leadControllerManager = mock(LeadControllerManager.class); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } + { + _tableSizeReader = mock(TableSizeReader.class); + when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); + } _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); + _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); Assert.assertEquals( @@ -459,6 +514,8 @@ public class SegmentStatusCheckerTest { 100); Assert.assertEquals( _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + Assert.assertEquals( + _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test @@ -491,10 +548,16 @@ public class SegmentStatusCheckerTest { _leadControllerManager = mock(LeadControllerManager.class); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } + { + _tableSizeReader = mock(TableSizeReader.class); + when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); + } _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); + _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); @@ -538,7 +601,8 @@ public class SegmentStatusCheckerTest { _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); // verify state before test Assert.assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0); // update metrics @@ -577,7 +641,8 @@ public class SegmentStatusCheckerTest { _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); // verify state before test Assert.assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0); // update metrics @@ -624,10 +689,16 @@ public class SegmentStatusCheckerTest { _leadControllerManager = mock(LeadControllerManager.class); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } + { + _tableSizeReader = mock(TableSizeReader.class); + when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); + } _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); + _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java index 2d351d7..2d42bf7 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java @@ -31,6 +31,7 @@ import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -94,8 +95,10 @@ public class StorageQuotaCheckerTest { new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); _tableSizeReader = mock(TableSizeReader.class); ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); - _storageQuotaChecker = new StorageQuotaChecker(tableConfig, _tableSizeReader, controllerMetrics, true, - mock(PinotHelixResourceManager.class)); + PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); + when(pinotHelixResourceManager.getNumReplicas(eq(tableConfig))).thenReturn(NUM_REPLICAS); + _storageQuotaChecker = + new StorageQuotaChecker(tableConfig, _tableSizeReader, controllerMetrics, true, pinotHelixResourceManager); tableConfig.setQuotaConfig(new QuotaConfig("2.8K", null)); // No response from server, should pass without updating metrics diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java index 335d8f4..fcac9a0 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java @@ -89,7 +89,8 @@ public class ValidationManagerTest { }, 30_000L, "Failed to find the segment in the ExternalView"); Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime())); ControllerTestUtils.getHelixResourceManager() - .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null); + .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null, + -1); segmentZKMetadata = ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME, TEST_SEGMENT_NAME); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 7e5c956..7cda305 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -550,6 +550,7 @@ public class CommonConstants { public static final String CRYPTER_NAME = "segment.crypter"; public static final String PARTITION_METADATA = "segment.partition.metadata"; public static final String CUSTOM_MAP = "custom.map"; + public static final String SIZE_IN_BYTES = "segment.size.in.bytes"; /** * This field is used for parallel push protection to lock the segment globally. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org