Jackie-Jiang commented on code in PR #8584: URL: https://github.com/apache/pinot/pull/8584#discussion_r864295648
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java: ########## @@ -104,18 +104,15 @@ public void init(HelixManager helixManager, TableConfig tableConfig) { @Override public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { - InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); - Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s", - _realtimeTableName); + Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided"); + InstancePartitions instancePartitions = instancePartitionsMap.entrySet().iterator().next().getValue(); Review Comment: We should check the `InstancePartitionsType` of the entry to decide how to assign the segment. The current logic is for `CONSUMING` type only, and we should add one for `COMPLETED` type, which should be similar to `OfflineSegmentAssignment.assignSegment()` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -1995,23 +1992,23 @@ public SegmentZKMetadata constructZkMetadataForNewSegment(String tableNameWithTy public void assignTableSegment(String tableNameWithType, String segmentName) { String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, segmentName); - InstancePartitionsType instancePartitionsType; - if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { - // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server - // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType. - // TODO When upload segments is open to all realtime tables, we should change the type to COMPLETED instead. - // In addition, RealtimeSegmentAssignment.assignSegment(..) method should be updated so that the method does not - // assign segments to CONSUMING instance partition only. - instancePartitionsType = InstancePartitionsType.CONSUMING; - } else { - instancePartitionsType = InstancePartitionsType.OFFLINE; - } // Assign instances for the segment and add it into IdealState try { TableConfig tableConfig = getTableConfig(tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); - SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig); + InstancePartitionsType instancePartitionsType; + if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { Review Comment: We should use `COMPLETED` type only if the `COMPLETED` instance partitions exists or the tag override is configured. If segment relocation is not configured, we should follow the same assignment as the `CONSUMING` segments. Let's keep the comments for the upsert case. For upsert segments, we should always use the `CONSUMING` type. ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -359,6 +359,16 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); } + /* + * This method is implemented to allow refreshing the segments in realtime tables. + */ + @Override + public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig) Review Comment: Move this implementation to the `BaseTableDataManager`. It is common for both offline and realtime table ########## pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java: ########## @@ -35,32 +35,27 @@ private SegmentUtils() { // Returns the partition id of a realtime segment based segment name and segment metadata info retrieved via Helix. // Important: The method is costly because it may read data from zookeeper. Do not use it in any query execution // path. - public static int getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, - HelixManager helixManager, String partitionColumn) { + public static @Nullable + Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager, Review Comment: (convention) ```suggestion @Nullable public static Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org