chenboat commented on a change in pull request #6567: URL: https://github.com/apache/incubator-pinot/pull/6567#discussion_r596474328
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -1649,63 +1651,100 @@ public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, Str public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypter) { String segmentName = segmentMetadata.getName(); - String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName); - + String tableNameWithType; + InstancePartitionsType instancePartitionsType; // NOTE: must first set the segment ZK metadata before assigning segment to instances because segment assignment // might need them to determine the partition of the segment, and server will need them to download the segment - OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata(); - ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata); - offlineSegmentZKMetadata.setDownloadUrl(downloadUrl); - offlineSegmentZKMetadata.setCrypterName(crypter); - offlineSegmentZKMetadata.setPushTime(System.currentTimeMillis()); + ZNRecord znRecord; + + if (isUpsertTable(tableName)) { + tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); + instancePartitionsType = InstancePartitionsType.CONSUMING; + // Build the realtime segment zk metadata with necessary fields. + LLCRealtimeSegmentZKMetadata segmentZKMetadata = new LLCRealtimeSegmentZKMetadata(); + ZKMetadataUtils + .updateSegmentMetadata(segmentZKMetadata, segmentMetadata, CommonConstants.Segment.SegmentType.REALTIME); + segmentZKMetadata.setDownloadUrl(downloadUrl); + segmentZKMetadata.setCrypterName(crypter); + segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOAD); + znRecord = segmentZKMetadata.toZNRecord(); + } else { + tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName); + instancePartitionsType = InstancePartitionsType.OFFLINE; + // Build the offline segment zk metadata with necessary fields. + OfflineSegmentZKMetadata segmentZKMetadata = new OfflineSegmentZKMetadata(); + ZKMetadataUtils + .updateSegmentMetadata(segmentZKMetadata, segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE); + segmentZKMetadata.setDownloadUrl(downloadUrl); + segmentZKMetadata.setCrypterName(crypter); + segmentZKMetadata.setPushTime(System.currentTimeMillis()); + znRecord = segmentZKMetadata.toZNRecord(); + } String segmentZKMetadataPath = - ZKMetadataProvider.constructPropertyStorePathForSegment(offlineTableName, segmentName); + ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, segmentName); Preconditions.checkState( - _propertyStore.set(segmentZKMetadataPath, offlineSegmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT), - "Failed to set segment ZK metadata for table: " + offlineTableName + ", segment: " + segmentName); - LOGGER.info("Added segment: {} of table: {} to property store", segmentName, offlineTableName); + _propertyStore.set(segmentZKMetadataPath, znRecord, AccessOption.PERSISTENT), + "Failed to set segment ZK metadata for table: " + tableNameWithType + ", segment: " + segmentName); + LOGGER.info("Added segment: {} of table: {} to property store", segmentName, tableNameWithType); + assignTableSegment(tableNameWithType, segmentName, segmentZKMetadataPath, instancePartitionsType); + } + + private void assignTableSegment(String tableNameWithType, String segmentName, String segmentZKMetadataPath, + InstancePartitionsType instancePartitionsType) { // Assign instances for the segment and add it into IdealState try { - TableConfig offlineTableConfig = getTableConfig(offlineTableName); + TableConfig tableConfig = getTableConfig(tableNameWithType); Preconditions - .checkState(offlineTableConfig != null, "Failed to find table config for table: " + offlineTableName); + .checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); SegmentAssignment segmentAssignment = - SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, offlineTableConfig); + SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig); Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections - .singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils - .fetchOrComputeInstancePartitions(_helixZkManager, offlineTableConfig, InstancePartitionsType.OFFLINE)); - synchronized (getTableUpdaterLock(offlineTableName)) { - HelixHelper.updateIdealState(_helixZkManager, offlineTableName, idealState -> { + .singletonMap(instancePartitionsType, InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType)); + synchronized (getTableUpdaterLock(tableNameWithType)) { + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { assert idealState != null; Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); if (currentAssignment.containsKey(segmentName)) { LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, - offlineTableName); + tableNameWithType); } else { List<String> assignedInstances = segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap); LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, - offlineTableName); - currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); + tableNameWithType); + currentAssignment.put(segmentName, SegmentAssignmentUtils + .getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); } return idealState; }); - LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, offlineTableName); + LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, tableNameWithType); } } catch (Exception e) { LOGGER .error("Caught exception while adding segment: {} to IdealState for table: {}, deleting segment ZK metadata", - segmentName, offlineTableName, e); + segmentName, tableNameWithType, e); if (_propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) { - LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, offlineTableName); + LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType); } else { LOGGER - .error("Failed to deleted segment ZK metadata for segment: {} of table: {}", segmentName, offlineTableName); + .error("Failed to deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType); } throw e; } } + public boolean isUpsertTable(String tableName) { + if (hasOfflineTable(tableName)) + return false; + if (!hasRealtimeTable(tableName)) Review comment: It is not redundant because a table which does not exist may reach here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org