Jackie-Jiang commented on code in PR #8653: URL: https://github.com/apache/pinot/pull/8653#discussion_r869568425
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java: ########## @@ -20,58 +20,106 @@ import java.util.HashMap; import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.segment.spi.creator.SegmentVersion; -import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; public class ZKMetadataUtils { private ZKMetadataUtils() { } - public static void updateSegmentMetadata(SegmentZKMetadata segmentZKMetadata, SegmentMetadata segmentMetadata) { - SegmentVersion segmentVersion = segmentMetadata.getVersion(); - if (segmentVersion != null) { - segmentZKMetadata.setIndexVersion(segmentVersion.name()); - } + /** + * Creates the segment ZK metadata for a new segment. + */ + public static SegmentZKMetadata createSegmentZKMetadata(String tableNameWithType, SegmentMetadata segmentMetadata, + String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) { + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentMetadata.getName()); + updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, crypter, + segmentSizeInBytes); + segmentZKMetadata.setPushTime(System.currentTimeMillis()); + return segmentZKMetadata; + } + + /** + * Refreshes the segment ZK metadata for a segment being replaced. + */ + public static void refreshSegmentZKMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata, + SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) { + updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, crypter, + segmentSizeInBytes); + segmentZKMetadata.setRefreshTime(System.currentTimeMillis()); + } + + private static void updateSegmentZKMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata, + SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) { if (segmentMetadata.getTimeInterval() != null) { segmentZKMetadata.setStartTime(segmentMetadata.getStartTime()); segmentZKMetadata.setEndTime(segmentMetadata.getEndTime()); segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit()); + } else { + segmentZKMetadata.setStartTime(-1); + segmentZKMetadata.setEndTime(-1); + segmentZKMetadata.setTimeUnit(null); + } + if (segmentMetadata.getVersion() != null) { + segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion().name()); + } else { + segmentZKMetadata.setIndexVersion(null); } segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs()); - segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); + segmentZKMetadata.setSizeInBytes(segmentSizeInBytes); segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc())); - SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = - new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, - segmentZKMetadata.getCustomMap()); - segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap())); + segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); + segmentZKMetadata.setDownloadUrl(downloadUrl); + segmentZKMetadata.setCrypterName(crypter); - // Extract column partition metadata (if any), and set it into segment ZK metadata. + // Set partition metadata Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>(); - if (segmentMetadata instanceof SegmentMetadataImpl) { - for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) { - String column = entry.getKey(); - ColumnMetadata columnMetadata = entry.getValue(); - PartitionFunction partitionFunction = columnMetadata.getPartitionFunction(); - - if (partitionFunction != null) { - ColumnPartitionMetadata columnPartitionMetadata = - new ColumnPartitionMetadata(partitionFunction.getName(), partitionFunction.getNumPartitions(), - columnMetadata.getPartitions(), partitionFunction.getFunctionConfig()); - columnPartitionMap.put(column, columnPartitionMetadata); - } + for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) { + String column = entry.getKey(); + ColumnMetadata columnMetadata = entry.getValue(); + PartitionFunction partitionFunction = columnMetadata.getPartitionFunction(); + if (partitionFunction != null) { + ColumnPartitionMetadata columnPartitionMetadata = + new ColumnPartitionMetadata(partitionFunction.getName(), partitionFunction.getNumPartitions(), + columnMetadata.getPartitions(), partitionFunction.getFunctionConfig()); + columnPartitionMap.put(column, columnPartitionMetadata); } } - if (!columnPartitionMap.isEmpty()) { segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(columnPartitionMap)); + } else { + segmentZKMetadata.setPartitionMetadata(null); + } + + // Update custom metadata + // NOTE: Do not remove existing keys because they can be set by the HTTP header from the segment upload request + Map<String, String> customMap = segmentZKMetadata.getCustomMap(); + if (customMap == null) { + customMap = segmentMetadata.getCustomMap(); + } else { + customMap.putAll(segmentMetadata.getCustomMap()); + } + segmentZKMetadata.setCustomMap(customMap); + + // Set fields specific to realtime table + if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED); + + // NOTE: Keep offset as is if it is not explicitly set in the segment metadata + if (segmentMetadata.getStartOffset() != null) { + segmentZKMetadata.setStartOffset(segmentMetadata.getStartOffset()); + } + if (segmentMetadata.getEndOffset() != null) { + segmentZKMetadata.setEndOffset(segmentMetadata.getEndOffset()); + } Review Comment: That's the current logic. If the start/end offset is set in the uploaded segment, we will update them in the ZK metadata. If they are not set in the uploaded segment, we keep the ones in the ZK metadata not changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org