This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch segment-merge-lineage in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 156cd4c326230a4497a12ba7d086bc6f2f402f1c Author: Seunghyun Lee <sn...@linkedin.com> AuthorDate: Sun Nov 25 22:10:45 2018 -0800 Segment merge lineage data structure 1. Added segment merge lineage that is a wrapper class of ZNRecord 2. Added segment merge group that will be used by broker during segment selection process 3. Added a unit test --- .../pinot/common/lineage/SegmentGroup.java | 72 +++++ .../pinot/common/lineage/SegmentMergeLineage.java | 313 +++++++++++++++++++++ .../lineage/SegmentMergeLineageAccessHelper.java | 82 ++++++ .../pinot/common/metadata/ZKMetadataProvider.java | 7 +- .../common/lineage/SegmentMergeLineageTest.java | 106 +++++++ 5 files changed, 579 insertions(+), 1 deletion(-) diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java new file mode 100644 index 0000000..fb28ac1 --- /dev/null +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-c...@linkedin.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.pinot.common.lineage; + +import java.util.List; +import java.util.Set; + + +/** + * Class to represent segment group + */ +public class SegmentGroup { + + private String _groupId; + private int _groupLevel; + private SegmentGroup _parentGroup; + private List<SegmentGroup> _childrenGroups; + private Set<String> _segments; + + public String getGroupId() { + return _groupId; + } + + public void setGroupId(String groupId) { + _groupId = groupId; + } + + public SegmentGroup getParentGroup() { + return _parentGroup; + } + + public void setParentGroup(SegmentGroup parentGroup) { + _parentGroup = parentGroup; + } + + public List<SegmentGroup> getChildrenGroups() { + return _childrenGroups; + } + + public void setChildrenGroups(List<SegmentGroup> childrenGroups) { + _childrenGroups = childrenGroups; + } + + public Set<String> getSegments() { + return _segments; + } + + public void setSegments(Set<String> segments) { + _segments = segments; + } + + public int getGroupLevel() { + return _groupLevel; + } + + public void setGroupLevel(int groupLevel) { + _groupLevel = groupLevel; + } +} diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java new file mode 100644 index 0000000..0ec0be1 --- /dev/null +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java @@ -0,0 +1,313 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-c...@linkedin.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.pinot.common.lineage; + +import com.linkedin.pinot.common.exception.InvalidConfigException; +import com.linkedin.pinot.common.utils.EqualityUtils; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import org.apache.helix.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class to represent segment merge lineage information + */ +public class SegmentMergeLineage { + + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeLineage.class); + + private static final String LEVEL_KEY_PREFIX = "level_"; + private static final String ROOT_NODE_GROUP_ID = "root"; + private static final String SEGMENT_DELIMITER = ","; + private static final int DEFAULT_GROUP_LEVEL = 0; + + private String _tableNameWithType; + private Map<String, List<String>> _parentGroupToChildrenGroupsMap; + private Map<Integer, Map<String, List<String>>> _levelToGroupToSegmentsMap; + + public SegmentMergeLineage(String tableNameWithType) { + _tableNameWithType = tableNameWithType; + _parentGroupToChildrenGroupsMap = new HashMap<>(); + _levelToGroupToSegmentsMap = new HashMap<>(); + } + + public SegmentMergeLineage(String tableNameWithType, Map<String, List<String>> segmentGroupLineageMap, + Map<Integer, Map<String, List<String>>> levelToGroupToSegmentMap, int version) { + _tableNameWithType = tableNameWithType; + _parentGroupToChildrenGroupsMap = segmentGroupLineageMap; + _levelToGroupToSegmentsMap = levelToGroupToSegmentMap; + } + + public String getTableName() { + return _tableNameWithType; + } + + public static SegmentMergeLineage fromZNRecord(ZNRecord record) { + String tableNameWithType = record.getId(); + int version = record.getVersion(); + Map<String, List<String>> segmentGroupLineageMap = record.getListFields(); + + Map<Integer, Map<String, List<String>>> groupToSegmentsMap = new HashMap<>(); + for (Map.Entry<String, Map<String, String>> entry : record.getMapFields().entrySet()) { + String levelKey = entry.getKey(); + Integer level = Integer.parseInt(levelKey.substring(LEVEL_KEY_PREFIX.length())); + Map<String, List<String>> groupToSegmentsForLevel = new HashMap<>(); + for (Map.Entry<String, String> groupEntry : entry.getValue().entrySet()) { + String groupId = groupEntry.getKey(); + String segmentsString = groupEntry.getValue(); + List<String> segments = Arrays.asList(segmentsString.split(SEGMENT_DELIMITER)); + groupToSegmentsForLevel.put(groupId, new ArrayList<>(segments)); + } + groupToSegmentsMap.put(level, groupToSegmentsForLevel); + } + return new SegmentMergeLineage(tableNameWithType, segmentGroupLineageMap, groupToSegmentsMap, version); + } + + public ZNRecord toZNRecord() { + ZNRecord record = new ZNRecord(_tableNameWithType); + record.setListFields(_parentGroupToChildrenGroupsMap); + Map<String, Map<String, String>> groupToSegmentsMap = new HashMap<>(); + + for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) { + String key = LEVEL_KEY_PREFIX + entry.getKey(); + Map<String, String> groupSegmentsForLevel = new HashMap<>(); + for (Map.Entry<String, List<String>> groupEntry : entry.getValue().entrySet()) { + String groupId = groupEntry.getKey(); + String segments = String.join(SEGMENT_DELIMITER, groupEntry.getValue()); + groupSegmentsForLevel.put(groupId, segments); + } + groupToSegmentsMap.put(key, groupSegmentsForLevel); + } + record.setMapFields(groupToSegmentsMap); + + return record; + } + + /** + * Add segment merge lineage information + * + * @param groupId a group id + * @param currentGroupSegments a list of segments that belongs to the group + * @param childrenGroups a list of children groups that the current group covers + */ + public void addSegmentGroup(String groupId, List<String> currentGroupSegments, List<String> childrenGroups) + throws InvalidConfigException { + // Get group level + Integer groupLevel = getGroupLevel(childrenGroups); + + // Update group to segments map + Map<String, List<String>> groupToSegmentMap = + _levelToGroupToSegmentsMap.computeIfAbsent(groupLevel, k -> new HashMap<>()); + if (groupToSegmentMap.containsKey(groupId)) { + throw new InvalidConfigException("Group id : " + groupId + " already exists."); + } + groupToSegmentMap.put(groupId, new ArrayList<>(currentGroupSegments)); + _levelToGroupToSegmentsMap.put(groupLevel, groupToSegmentMap); + + // Update segment group lineage map + if (groupLevel > DEFAULT_GROUP_LEVEL) { + if (_parentGroupToChildrenGroupsMap.containsKey(groupId)) { + throw new InvalidConfigException("Group id : " + groupId + " already exists."); + } else { + _parentGroupToChildrenGroupsMap.put(groupId, new ArrayList<>(childrenGroups)); + } + } + + LOGGER.info("New group has been added successfully to the segment lineage. (groupId: {}, currentGroupSegments: {}, " + + "childrenGroups: {}", groupId, currentGroupSegments, childrenGroups); + } + + /** + * Remove segment merge information given a group id + * + * @param groupId a group id + */ + public void removeSegmentGroup(String groupId) { + // Clean up the group id from parent to children group mapping + _parentGroupToChildrenGroupsMap.remove(groupId); + for (List<String> childrenGroups : _parentGroupToChildrenGroupsMap.values()) { + childrenGroups.remove(groupId); + } + + // Clean up the group id from group to segments mapping + for (Map<String, List<String>> groupToSegments : _levelToGroupToSegmentsMap.values()) { + groupToSegments.remove(groupId); + } + + LOGGER.info("Group {} has been successfully removed.", groupId); + } + + /** + * Construct a lineage tree and returns the root node + * + * @return a root node for lineage tree + */ + public SegmentGroup getMergeLineageRootSegmentGroup() { + // Create group nodes + Map<String, SegmentGroup> groupNodes = new HashMap<>(); + for (Map.Entry<Integer, Map<String, List<String>>> groupEntryForLevel : _levelToGroupToSegmentsMap.entrySet()) { + Integer level = groupEntryForLevel.getKey(); + Map<String, List<String>> groupToSegmentsForLevel = groupEntryForLevel.getValue(); + for (Map.Entry<String, List<String>> entry : groupToSegmentsForLevel.entrySet()) { + String groupId = entry.getKey(); + List<String> segments = entry.getValue(); + SegmentGroup groupNode = new SegmentGroup(); + groupNode.setGroupId(groupId); + groupNode.setSegments(new HashSet<>(segments)); + groupNode.setGroupLevel(level); + groupNodes.put(groupId, groupNode); + } + } + + // Add edges by updating children & parent group information + for (Map.Entry<String, List<String>> lineageEntry : _parentGroupToChildrenGroupsMap.entrySet()) { + String parentGroupId = lineageEntry.getKey(); + List<String> childrenGroupIds = lineageEntry.getValue(); + List<SegmentGroup> childrenGroups = new ArrayList<>(); + SegmentGroup parentNode = groupNodes.get(parentGroupId); + for (String groupId : childrenGroupIds) { + SegmentGroup childNode = groupNodes.get(groupId); + if (childNode != null) { + childrenGroups.add(childNode); + childNode.setParentGroup(parentNode); + } + } + parentNode.setChildrenGroups(childrenGroups); + } + + // Create a root node + SegmentGroup root = new SegmentGroup(); + root.setGroupId(ROOT_NODE_GROUP_ID); + List<SegmentGroup> childrenForRoot = new ArrayList<>(); + for (SegmentGroup group : groupNodes.values()) { + if (group.getParentGroup() == null) { + group.setParentGroup(root); + childrenForRoot.add(group); + } + } + root.setChildrenGroups(childrenForRoot); + + return root; + } + + /** + * Get a list of segments for a given group id + * + * @param groupId a group id + * @return a list of segments that belongs to the given group id, null if the group does not exist + */ + public List<String> getSegmentsForGroup(String groupId) { + for (Map<String, List<String>> groupToSegmentMap : _levelToGroupToSegmentsMap.values()) { + List<String> segments = groupToSegmentMap.get(groupId); + if (segments != null) { + return segments; + } + } + return null; + } + + /** + * Get a list of children group ids for a given group id + * + * @param groupId a group id + * @return a list of children groups that are covered by the given group id, null if the group does not exist + */ + public List<String> getChildrenForGroup(String groupId) { + return _parentGroupToChildrenGroupsMap.get(groupId); + } + + /** + * Get a list of all group levels + * + * @return a list of all group levels + */ + public List<Integer> getAllGroupLevels() { + List<Integer> groupLevels = new ArrayList<>(_levelToGroupToSegmentsMap.keySet()); + Collections.sort(groupLevels); + return groupLevels; + } + + /** + * Get a list of group ids for a given group level + * + * @param groupLevel a group level + * @return a list of group ids that belongs to the given group level, null if the group level does not exist + */ + public List<String> getGroupIdsForGroupLevel(int groupLevel) { + Map<String, List<String>> groupToSegmentsMap = _levelToGroupToSegmentsMap.get(groupLevel); + if (groupToSegmentsMap != null) { + return new ArrayList<>(groupToSegmentsMap.keySet()); + } + return null; + } + + /** + * Helper function to compute group level given children groups + * + * @param childrenGroups a list of children group ids + * @return group level + */ + private Integer getGroupLevel(List<String> childrenGroups) throws InvalidConfigException { + // If no children exists, the group belongs to the base level. + if (childrenGroups == null || childrenGroups.isEmpty()) { + return DEFAULT_GROUP_LEVEL; + } + + for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) { + Integer currentLevel = entry.getKey(); + Map<String, List<String>> currentLevelGroupToSegmentsMap = entry.getValue(); + if (currentLevelGroupToSegmentsMap.keySet().containsAll(childrenGroups)) { + return currentLevel + 1; + } + } + + // At this point, not all children groups are covered, cannot add group + throw new InvalidConfigException("Cannot compute group level because not all children groups exist " + + "in the segment merge lineage, children groups: " + childrenGroups); + } + + @Override + public boolean equals(Object o) { + if (EqualityUtils.isSameReference(this, o)) { + return true; + } + + if (EqualityUtils.isNullOrNotSameClass(this, o)) { + return false; + } + + SegmentMergeLineage that = (SegmentMergeLineage) o; + + return EqualityUtils.isEqual(_tableNameWithType, that._tableNameWithType) && EqualityUtils.isEqual( + _parentGroupToChildrenGroupsMap, that._parentGroupToChildrenGroupsMap) && EqualityUtils.isEqual( + _levelToGroupToSegmentsMap, that._levelToGroupToSegmentsMap); + } + + @Override + public int hashCode() { + int result = EqualityUtils.hashCodeOf(_tableNameWithType); + result = EqualityUtils.hashCodeOf(result, _parentGroupToChildrenGroupsMap); + result = EqualityUtils.hashCodeOf(result, _levelToGroupToSegmentsMap); + return result; + } +} diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java new file mode 100644 index 0000000..3f76c7c --- /dev/null +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-c...@linkedin.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.pinot.common.lineage; + +import com.linkedin.pinot.common.metadata.ZKMetadataProvider; +import com.linkedin.pinot.common.utils.retry.RetryPolicies; +import com.linkedin.pinot.common.utils.retry.RetryPolicy; +import java.util.List; +import org.apache.helix.AccessOption; +import org.apache.helix.PropertyPathConfig; +import org.apache.helix.PropertyType; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.zookeeper.data.Stat; + + +/** + * Class to help to read, write segment merge lineage + */ +public class SegmentMergeLineageAccessHelper { + + /** + * Read the segment merge lineage from the property store. Whenever we need the version + * + * @param propertyStore a property store + * @param tableNameWithType a table name with type + * @return a ZNRecord of segment merge lineage + */ + public static ZNRecord getSegmentMergeLineageZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore, + String tableNameWithType) { + String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType); + Stat stat = new Stat(); + ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT); + if (segmentMergeLineageZNRecord != null) { + segmentMergeLineageZNRecord.setVersion(stat.getVersion()); + } + return segmentMergeLineageZNRecord; + } + + /** + * Read the segment merge lineage from the property store + * + * @param propertyStore a property store + * @param tableNameWithType a table name with type + * @return a segment merge lineage + */ + public static SegmentMergeLineage getSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore, + String tableNameWithType) { + String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType); + ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, null, AccessOption.PERSISTENT); + return SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord); + } + + /** + * Write the segment merge lineage to the property store + * + * @param propertyStore a property store + * @param segmentMergeLineage a segment merge lineage + * @return true if update is successful. false otherwise. + */ + public static boolean writeSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore, + SegmentMergeLineage segmentMergeLineage, int expectedVersion) { + String tableNameWithType = segmentMergeLineage.getTableName(); + String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType); + return propertyStore.set(path, segmentMergeLineage.toZNRecord(), expectedVersion, AccessOption.PERSISTENT); + } +} diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java index fe55a0c..3bc2d56 100644 --- a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java @@ -53,6 +53,7 @@ public class ZKMetadataProvider { private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE"; private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = "/CONFIGS/INSTANCE"; private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER"; + private static final String PROPERTYSTORE_SEGMENT_MERGE_LINEAGE = "/SEGMENT_MERGE_LINEAGE"; public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName, ZNRecord znRecord) { @@ -106,6 +107,10 @@ public class ZKMetadataProvider { return StringUtil.join("/", PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX, controllerConfigKey); } + public static String constructPropertyStorePathForSegmentMergeLineage(String tableNameWithType) { + return StringUtil.join("/", PROPERTYSTORE_SEGMENT_MERGE_LINEAGE, tableNameWithType); + } + public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource, String segmentName) { return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName), @@ -251,7 +256,7 @@ public class ZKMetadataProvider { * Get the schema associated with the given table name. * * @param propertyStore Helix property store - * @param tableName Table name with or without type suffix. + * @param tableName Table name with or without type suffix.FtaskTypeConfigsMap * @return Schema associated with the given table name. */ @Nullable diff --git a/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java new file mode 100644 index 0000000..923fbfc --- /dev/null +++ b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-c...@linkedin.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.pinot.common.lineage; + +import com.linkedin.pinot.common.exception.InvalidConfigException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class SegmentMergeLineageTest { + + @Test + public void testSegmentMergeLineage() throws Exception { + SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE"); + String groupId1 = "G1"; + List<String> groupSegments1 = Arrays.asList(new String[]{"segment1", "segment2", "segment3"}); + segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null); + Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1); + + String groupId2 = "G2"; + List<String> groupSegments2 = Arrays.asList(new String[]{"segment4", "segment5"}); + segmentMergeLineage.addSegmentGroup(groupId2, groupSegments2, null); + Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId2), groupSegments2); + + String groupId3 = "G3"; + List<String> groupSegments3 = Arrays.asList(new String[]{"segment6"}); + segmentMergeLineage.addSegmentGroup(groupId3, groupSegments3, Arrays.asList(new String[]{groupId1, groupId2})); + Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId3), groupSegments3); + + // Check available APIs + Assert.assertEquals(segmentMergeLineage.getTableName(), "test_OFFLINE"); + Assert.assertEquals(segmentMergeLineage.getChildrenForGroup(groupId3), + Arrays.asList(new String[]{groupId1, groupId2})); + Assert.assertEquals(segmentMergeLineage.getAllGroupLevels(), Arrays.asList(new Integer[]{0, 1})); + Assert.assertTrue(segmentMergeLineage.equals(SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord()))); + Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(0), + Arrays.asList(new String[]{groupId1, groupId2})); + Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(1), + Arrays.asList(new String[]{groupId3})); + validateSegmentGroup(segmentMergeLineage); + + // Check ZNRecord conversion + Assert.assertEquals(segmentMergeLineage, SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord())); + + // Test removing groups + segmentMergeLineage.removeSegmentGroup(groupId1); + Assert.assertNull(segmentMergeLineage.getChildrenForGroup(groupId1)); + Assert.assertNull(segmentMergeLineage.getSegmentsForGroup(groupId1)); + Assert.assertFalse(segmentMergeLineage.getGroupIdsForGroupLevel(0).contains(groupId1)); + } + + @Test(expectedExceptions = InvalidConfigException.class) + public void testUpdateWithDuplicateGroupId() throws Exception { + SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE"); + String groupId1 = "G1"; + List<String> groupSegments1 = Arrays.asList(new String[]{"segment1, segment2, segment3"}); + segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null); + Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1); + + List<String> groupSegments2 = Arrays.asList(new String[]{"segment4, segment5, segment6"}); + segmentMergeLineage.addSegmentGroup(groupId1, groupSegments2, null); + } + + private void validateSegmentGroup(SegmentMergeLineage segmentMergeLineage) { + SegmentGroup segmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup(); + for (SegmentGroup child : segmentGroup.getChildrenGroups()) { + validateSegmentGroupNode(child, segmentMergeLineage); + } + } + + private void validateSegmentGroupNode(SegmentGroup segmentGroup, SegmentMergeLineage segmentMergeLineage) { + String groupId = segmentGroup.getGroupId(); + Assert.assertEquals(segmentGroup.getSegments(), new HashSet<>(segmentMergeLineage.getSegmentsForGroup(groupId))); + Assert.assertTrue(segmentMergeLineage.getGroupIdsForGroupLevel(segmentGroup.getGroupLevel()).contains(groupId)); + + List<SegmentGroup> childrenGroups = segmentGroup.getChildrenGroups(); + if (childrenGroups != null) { + List<String> childrenGroupIds = new ArrayList<>(); + for (SegmentGroup child : childrenGroups) { + childrenGroupIds.add(child.getGroupId()); + } + Assert.assertEquals(childrenGroupIds, segmentMergeLineage.getChildrenForGroup(groupId)); + + for (SegmentGroup child : segmentGroup.getChildrenGroups()) { + validateSegmentGroupNode(child, segmentMergeLineage); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org