This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 947b47e3f4 Enhancing metadata API to return upsert partition to primary key count map for both controller and server APIs (#12334) 947b47e3f4 is described below commit 947b47e3f49bb7434dbb6f47b6c538fa91c61084 Author: 9aman <35227405+9a...@users.noreply.github.com> AuthorDate: Fri Feb 9 11:42:03 2024 +0530 Enhancing metadata API to return upsert partition to primary key count map for both controller and server APIs (#12334) --- .../common/restlet/resources/TableMetadataInfo.java | 10 +++++++++- .../controller/util/ServerSegmentMetadataReader.java | 11 ++++++++++- .../data/manager/realtime/RealtimeTableDataManager.java | 12 ++++++++++++ .../tests/models/DummyTableUpsertMetadataManager.java | 6 ++++++ .../upsert/ConcurrentMapTableUpsertMetadataManager.java | 10 ++++++++++ .../local/upsert/TableUpsertMetadataManager.java | 8 ++++++++ .../pinot/server/api/resources/TablesResource.java | 17 ++++++++++++++++- 7 files changed, 71 insertions(+), 3 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java index 27e28ab376..4a6953ac2c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java @@ -43,6 +43,7 @@ public class TableMetadataInfo { private final Map<String, Double> _columnCardinalityMap; private final Map<String, Double> _maxNumMultiValuesMap; private final Map<String, Map<String, Double>> _columnIndexSizeMap; + private final Map<Integer, Map<String, Long>> _upsertPartitionToServerPrimaryKeyCountMap; @JsonCreator public TableMetadataInfo(@JsonProperty("tableName") String tableName, @@ -50,7 +51,9 @@ public class TableMetadataInfo { @JsonProperty("numRows") long numRows, @JsonProperty("columnLengthMap") Map<String, Double> columnLengthMap, @JsonProperty("columnCardinalityMap") Map<String, Double> columnCardinalityMap, @JsonProperty("maxNumMultiValuesMap") Map<String, Double> maxNumMultiValuesMap, - @JsonProperty("columnIndexSizeMap") Map<String, Map<String, Double>> columnIndexSizeMap) { + @JsonProperty("columnIndexSizeMap") Map<String, Map<String, Double>> columnIndexSizeMap, + @JsonProperty("upsertPartitionToServerPrimaryKeyCountMap") + Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap) { _tableName = tableName; _diskSizeInBytes = sizeInBytes; _numSegments = numSegments; @@ -59,6 +62,7 @@ public class TableMetadataInfo { _columnCardinalityMap = columnCardinalityMap; _maxNumMultiValuesMap = maxNumMultiValuesMap; _columnIndexSizeMap = columnIndexSizeMap; + _upsertPartitionToServerPrimaryKeyCountMap = upsertPartitionToServerPrimaryKeyCountMap; } public String getTableName() { @@ -92,4 +96,8 @@ public class TableMetadataInfo { public Map<String, Map<String, Double>> getColumnIndexSizeMap() { return _columnIndexSizeMap; } + + public Map<Integer, Map<String, Long>> getUpsertPartitionToServerPrimaryKeyCountMap() { + return _upsertPartitionToServerPrimaryKeyCountMap; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index f728d51635..cdcc7dc78c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -112,6 +112,7 @@ public class ServerSegmentMetadataReader { final Map<String, Double> columnCardinalityMap = new HashMap<>(); final Map<String, Double> maxNumMultiValuesMap = new HashMap<>(); final Map<String, Map<String, Double>> columnIndexSizeMap = new HashMap<>(); + final Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap = new HashMap<>(); for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) { try { TableMetadataInfo tableMetadataInfo = @@ -128,6 +129,14 @@ public class ServerSegmentMetadataReader { } return l; })); + tableMetadataInfo.getUpsertPartitionToServerPrimaryKeyCountMap().forEach( + (partition, serverToPrimaryKeyCount) -> upsertPartitionToServerPrimaryKeyCountMap.merge(partition, + new HashMap<>(serverToPrimaryKeyCount), (l, r) -> { + for (Map.Entry<String, Long> serverToPKCount : r.entrySet()) { + l.merge(serverToPKCount.getKey(), serverToPKCount.getValue(), Long::sum); + } + return l; + })); } catch (IOException e) { failedParses++; LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e); @@ -151,7 +160,7 @@ public class ServerSegmentMetadataReader { TableMetadataInfo aggregateTableMetadataInfo = new TableMetadataInfo(tableNameWithType, totalDiskSizeInBytes, totalNumSegments, totalNumRows, columnLengthMap, - columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap); + columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap, upsertPartitionToServerPrimaryKeyCountMap); if (failedParses != 0) { LOGGER.warn("Failed to parse {} / {} aggregated segment metadata responses from servers.", failedParses, serverUrls.size()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index d974663b20..c3cb5c603a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -708,6 +708,18 @@ public class RealtimeTableDataManager extends BaseTableDataManager { return _tableUpsertMetadataManager; } + /** + * Retrieves a mapping of partition id to the primary key count for the partition. + * + * @return A {@code Map} where keys are partition id and values are count of primary keys for that specific partition. + */ + public Map<Integer, Long> getUpsertPartitionToPrimaryKeyCount() { + if (isUpsertEnabled()) { + return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount(); + } + return Collections.emptyMap(); + } + /** * Validate a schema against the table config for real-time record consumption. * Ideally, we should validate these things when schema is added or table is created, but either of these diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java index 502834a6b6..a549389697 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.helix.HelixManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -72,6 +73,11 @@ public class DummyTableUpsertMetadataManager extends BaseTableUpsertMetadataMana public void stop() { } + @Override + public Map<Integer, Long> getPartitionToPrimaryKeyCount() { + return Collections.emptyMap(); + } + @Override public void close() throws IOException { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java index b0593f8d5f..6d48464b69 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.upsert; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.concurrent.ThreadSafe; @@ -45,6 +46,15 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta } } + @Override + public Map<Integer, Long> getPartitionToPrimaryKeyCount() { + Map<Integer, Long> partitionToPrimaryKeyCount = new HashMap<>(); + _partitionMetadataManagerMap.forEach( + (partitionID, upsertMetadataManager) -> partitionToPrimaryKeyCount.put(partitionID, + upsertMetadataManager.getNumPrimaryKeys())); + return partitionToPrimaryKeyCount; + } + @Override public void close() throws IOException { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java index 2ac107d790..3e98030d8a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.upsert; import java.io.Closeable; +import java.util.Map; import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -47,5 +48,12 @@ public interface TableUpsertMetadataManager extends Closeable { */ void stop(); + /** + * Retrieves a mapping of partition id to the primary key count for the partition. + * + * @return A {@code Map} where keys are partition id and values are count of primary keys for that specific partition + */ + Map<Integer, Long> getPartitionToPrimaryKeyCount(); + boolean isPreloading(); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index 5e458bbb28..aad233bc77 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -80,6 +80,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -286,9 +287,23 @@ public class TablesResource { } } + // fetch partition to primary key count for realtime tables that have upsert enabled + Map<Integer, Long> upsertPartitionToPrimaryKeyCountMap = new HashMap<>(); + if (tableDataManager instanceof RealtimeTableDataManager) { + RealtimeTableDataManager realtimeTableDataManager = (RealtimeTableDataManager) tableDataManager; + upsertPartitionToPrimaryKeyCountMap = realtimeTableDataManager.getUpsertPartitionToPrimaryKeyCount(); + } + + // construct upsertPartitionToServerPrimaryKeyCountMap to populate in TableMetadataInfo + Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap = new HashMap<>(); + upsertPartitionToPrimaryKeyCountMap.forEach( + (partition, primaryKeyCount) -> upsertPartitionToServerPrimaryKeyCountMap.put(partition, + Map.of(instanceDataManager.getInstanceId(), primaryKeyCount))); + TableMetadataInfo tableMetadataInfo = new TableMetadataInfo(tableDataManager.getTableName(), totalSegmentSizeBytes, segmentDataManagers.size(), - totalNumRows, columnLengthMap, columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizesMap); + totalNumRows, columnLengthMap, columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizesMap, + upsertPartitionToServerPrimaryKeyCountMap); return ResourceUtils.convertToJsonString(tableMetadataInfo); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org