This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b8af790c2f Enable uploading segments to realtime tables (#8584) b8af790c2f is described below commit b8af790c2f26b5bafcc8f154e1dae2edc60b43c0 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Mon May 16 18:48:40 2022 -0700 Enable uploading segments to realtime tables (#8584) --- .../apache/pinot/common/utils/SegmentUtils.java | 32 +++-- .../PinotSegmentUploadDownloadRestletResource.java | 13 +- .../helix/core/PinotHelixResourceManager.java | 48 +++++--- .../segment/RealtimeSegmentAssignment.java | 50 ++++---- ...altimeNonReplicaGroupSegmentAssignmentTest.java | 114 ++++++++++++++---- ...NonReplicaGroupTieredSegmentAssignmentTest.java | 5 +- .../RealtimeReplicaGroupSegmentAssignmentTest.java | 132 ++++++++++++++++----- .../core/data/manager/BaseTableDataManager.java | 3 +- .../manager/offline/OfflineTableDataManager.java | 12 -- .../manager/realtime/RealtimeTableDataManager.java | 5 +- .../tests/LLCRealtimeClusterIntegrationTest.java | 96 ++++++++++++++- .../tests/RealtimeClusterIntegrationTest.java | 16 ++- 12 files changed, 392 insertions(+), 134 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java index 21b3fb4f3e..04be4a1c58 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java @@ -19,7 +19,7 @@ package org.apache.pinot.common.utils; import com.google.common.base.Preconditions; -import java.util.Set; +import javax.annotation.Nullable; import org.apache.helix.HelixManager; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; @@ -35,32 +35,28 @@ public class SegmentUtils { // Returns the partition id of a realtime segment based segment name and segment metadata info retrieved via Helix. // Important: The method is costly because it may read data from zookeeper. Do not use it in any query execution // path. - public static int getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, - HelixManager helixManager, String partitionColumn) { + @Nullable + public static Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, + HelixManager helixManager, + String partitionColumn) { // A fast path if the segmentName is a LLC segment name and we can get the partition id from the name directly. if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) { return new LLCSegmentName(segmentName).getPartitionGroupId(); } - // Otherwise, retrieve the partition id from the segment zk metadata. Currently only realtime segments from upsert - // enabled tables have partition ids in their segment metadata. + // Otherwise, retrieve the partition id from the segment zk metadata. SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), realtimeTableName, segmentName); Preconditions .checkState(segmentZKMetadata != null, "Failed to find segment ZK metadata for segment: %s of table: %s", segmentName, realtimeTableName); SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata(); - Preconditions.checkState(segmentPartitionMetadata != null, - "Segment ZK metadata for segment: %s of table: %s does not contain partition metadata", segmentName, - realtimeTableName); - ColumnPartitionMetadata columnPartitionMetadata = - segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn); - Preconditions.checkState(columnPartitionMetadata != null, - "Segment ZK metadata for segment: %s of table: %s does not contain partition metadata for column: %s. Check " - + "if the table is an upsert table.", segmentName, realtimeTableName, partitionColumn); - Set<Integer> partitions = columnPartitionMetadata.getPartitions(); - Preconditions.checkState(partitions.size() == 1, - "Segment ZK metadata for segment: %s of table: %s contains multiple partitions for column: %s with %s", - segmentName, realtimeTableName, partitionColumn, partitions); - return partitions.iterator().next(); + if (segmentPartitionMetadata != null) { + ColumnPartitionMetadata columnPartitionMetadata = + segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn); + if (columnPartitionMetadata != null && columnPartitionMetadata.getPartitions().size() == 1) { + return columnPartitionMetadata.getPartitions().iterator().next(); + } + } + return null; } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 01aa8c832b..0590fb2b1a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -292,16 +292,9 @@ public class PinotSegmentUploadDownloadRestletResource { LOGGER.warn("Table name is not provided as request query parameter when uploading segment: {} for table: {}", segmentName, rawTableName); } - String tableNameWithType; - if (tableType == TableType.OFFLINE) { - tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); - } else { - tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); - if (!_pinotHelixResourceManager.isUpsertTable(tableNameWithType)) { - throw new ControllerApplicationException(LOGGER, - "Cannot upload segment to non-upsert real-time table: " + tableNameWithType, Response.Status.FORBIDDEN); - } - } + String tableNameWithType = tableType == TableType.OFFLINE + ? TableNameBuilder.OFFLINE.tableNameWithType(rawTableName) + : TableNameBuilder.REALTIME.tableNameWithType(rawTableName); String clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName(); LOGGER.info("Processing upload request for segment: {} of table: {} with upload type: {} from client: {}, " diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index f9d0e86447..49fe4cdd9e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -133,6 +133,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableCustomConfig; import org.apache.pinot.spi.config.table.TableStats; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.TagOverrideConfig; import org.apache.pinot.spi.config.table.TenantConfig; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; @@ -167,7 +168,7 @@ public class PinotHelixResourceManager { // TODO: make this configurable public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 60_000L; // 10 minutes - public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 secondL + public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 second private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'"); @@ -1997,26 +1998,14 @@ public class PinotHelixResourceManager { public void assignTableSegment(String tableNameWithType, String segmentName) { String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, segmentName); - InstancePartitionsType instancePartitionsType; - if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { - // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server - // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType. - // TODO When upload segments is open to all realtime tables, we should change the type to COMPLETED instead. - // In addition, RealtimeSegmentAssignment.assignSegment(..) method should be updated so that the method does not - // assign segments to CONSUMING instance partition only. - instancePartitionsType = InstancePartitionsType.CONSUMING; - } else { - instancePartitionsType = InstancePartitionsType.OFFLINE; - } // Assign instances for the segment and add it into IdealState try { TableConfig tableConfig = getTableConfig(tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + fetchOrComputeInstancePartitions(tableNameWithType, tableConfig); SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig); - Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections - .singletonMap(instancePartitionsType, InstancePartitionsUtils - .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType)); synchronized (getTableUpdaterLock(tableNameWithType)) { HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { assert idealState != null; @@ -2050,6 +2039,35 @@ public class PinotHelixResourceManager { } } + private Map<InstancePartitionsType, InstancePartitions> fetchOrComputeInstancePartitions(String tableNameWithType, + TableConfig tableConfig) { + if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) { + return Collections.singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, InstancePartitionsType.OFFLINE)); + } + if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) { + // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server + // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType. + return Collections.singletonMap(InstancePartitionsType.CONSUMING, InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, InstancePartitionsType.CONSUMING)); + } + // for non-upsert realtime tables, if COMPLETED instance partitions is available or tag override for + // completed segments is provided in the tenant config, COMPLETED instance partitions type is used + // otherwise CONSUMING instance partitions type is used. + InstancePartitionsType instancePartitionsType = InstancePartitionsType.COMPLETED; + InstancePartitions instancePartitions = InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, + InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString())); + if (instancePartitions != null) { + return Collections.singletonMap(instancePartitionsType, instancePartitions); + } + TagOverrideConfig tagOverrideConfig = tableConfig.getTenantConfig().getTagOverrideConfig(); + if (tagOverrideConfig == null || tagOverrideConfig.getRealtimeCompleted() == null) { + instancePartitionsType = InstancePartitionsType.CONSUMING; + } + return Collections.singletonMap(instancePartitionsType, + InstancePartitionsUtils.computeDefaultInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType)); + } + public boolean isUpsertTable(String tableName) { TableConfig realtimeTableConfig = getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName)); if (realtimeTableConfig == null) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index c97b53e75c..3ede57c6d7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -104,18 +104,20 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { @Override public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { - InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); - Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s", - _realtimeTableName); + Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided"); + Map.Entry<InstancePartitionsType, InstancePartitions> typeToInstancePartitions = + instancePartitionsMap.entrySet().iterator().next(); + InstancePartitionsType instancePartitionsType = typeToInstancePartitions.getKey(); + InstancePartitions instancePartitions = typeToInstancePartitions.getValue(); Preconditions .checkState(instancePartitions.getNumPartitions() == 1, "Instance partitions: %s should contain 1 partition", instancePartitions.getInstancePartitionsName()); LOGGER.info("Assigning segment: {} with instance partitions: {} for table: {}", segmentName, instancePartitions, _realtimeTableName); checkReplication(instancePartitions); - - List<String> instancesAssigned = assignConsumingSegment(segmentName, instancePartitions); - + List<String> instancesAssigned = instancePartitionsType == InstancePartitionsType.CONSUMING + ? assignConsumingSegment(segmentName, instancePartitions) + : assignCompletedSegment(segmentName, currentAssignment, instancePartitions); LOGGER.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned, _realtimeTableName); return instancesAssigned; @@ -141,9 +143,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { * Helper method to assign instances for CONSUMING segment based on the segment partition id and instance partitions. */ private List<String> assignConsumingSegment(String segmentName, InstancePartitions instancePartitions) { - int partitionGroupId = - SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn); - + int partitionGroupId = getPartitionGroupId(segmentName); int numReplicaGroups = instancePartitions.getNumReplicaGroups(); if (numReplicaGroups == 1) { // Non-replica-group based assignment: @@ -158,7 +158,8 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { int numInstances = instances.size(); List<String> instancesAssigned = new ArrayList<>(_replication); for (int replicaId = 0; replicaId < _replication; replicaId++) { - instancesAssigned.add(instances.get((partitionGroupId * _replication + replicaId) % numInstances)); + int instanceIndex = (partitionGroupId * _replication + replicaId) % numInstances; + instancesAssigned.add(instances.get(instanceIndex)); } return instancesAssigned; } else { @@ -185,8 +186,9 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { InstancePartitions completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED); InstancePartitions consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); - Preconditions.checkState(consumingInstancePartitions != null, - "Failed to find COMPLETED or CONSUMING instance partitions for table: %s", _realtimeTableName); + Preconditions + .checkState(consumingInstancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s", + _realtimeTableName); Preconditions.checkState(consumingInstancePartitions.getNumPartitions() == 1, "Instance partitions: %s should contain 1 partition", consumingInstancePartitions.getInstancePartitionsName()); boolean includeConsuming = config @@ -331,8 +333,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { int numPartitions = instancePartitions.getNumPartitions(); Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>(); for (String segmentName : currentAssignment.keySet()) { - int partitionGroupId = SegmentUtils - .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn); + int partitionGroupId = getPartitionGroupId(segmentName); int instancePartitionId = partitionGroupId % numPartitions; instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()) .add(segmentName); @@ -368,12 +369,21 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { // Replica-group based assignment // Uniformly spray the segment partitions over the instance partitions - int segmentPartitionId = - SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn); - int numPartitions = instancePartitions.getNumPartitions(); - int partitionGroupId = segmentPartitionId % numPartitions; - return SegmentAssignmentUtils - .assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionGroupId); + int partitionId = getPartitionGroupId(segmentName) % instancePartitions.getNumPartitions(); + return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId); + } + } + + private int getPartitionGroupId(String segmentName) { + Integer segmentPartitionId = + SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn); + if (segmentPartitionId == null) { + // This case is for the uploaded segments for which there's no partition information. + // A random, but consistent, partition id is calculated based on the hash code of the segment name. + // Note that '% 10K' is used to prevent having partition ids with large value which will be problematic later in + // instance assignment formula. + segmentPartitionId = Math.abs(segmentName.hashCode() % 10_000); } + return segmentPartitionId; } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java index 1fe98d8eba..19d5235d77 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java @@ -18,11 +18,16 @@ */ package org.apache.pinot.controller.helix.core.assignment.segment; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.commons.configuration.BaseConfiguration; +import org.apache.helix.HelixManager; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.spi.config.table.TableConfig; @@ -34,6 +39,11 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -71,7 +81,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) .setLLC(true).build(); - _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, tableConfig); + _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), tableConfig); _instancePartitionsMap = new TreeMap<>(); // CONSUMING instances: @@ -102,11 +112,13 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { @Test public void testAssignSegment() { + Map<InstancePartitionsType, InstancePartitions> onlyConsumingInstancePartitionMap = + ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = _segments.get(segmentId); List<String> instancesAssigned = - _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); + _segmentAssignment.assignSegment(segmentName, currentAssignment, onlyConsumingInstancePartitionMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); // Segment 0 (partition 0) should be assigned to instance 0, 1, 2 @@ -128,11 +140,13 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { @Test public void testRelocateCompletedSegments() { + Map<InstancePartitionsType, InstancePartitions> onlyConsumingInstancePartitionMap = + ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = _segments.get(segmentId); List<String> instancesAssigned = - _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); + _segmentAssignment.assignSegment(segmentName, currentAssignment, onlyConsumingInstancePartitionMap); addToAssignment(currentAssignment, segmentId, instancesAssigned); } @@ -150,10 +164,31 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { SegmentStateModel.OFFLINE); currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); + // Add an uploaded ONLINE segment to the consuming instances (i.e. no separation between consuming & completed) + List<String> uploadedSegmentNames = ImmutableList.of("UploadedSegment1", "UploadedSegment2"); + onlyConsumingInstancePartitionMap = + ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); + for (String uploadedSegName : uploadedSegmentNames) { + List<String> instancesAssigned = + _segmentAssignment.assignSegment(uploadedSegName, currentAssignment, onlyConsumingInstancePartitionMap); + currentAssignment.put(uploadedSegName, + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); + } + // Now there should be 103 segments assigned + assertEquals(currentAssignment.size(), NUM_SEGMENTS + uploadedSegmentNames.size() + 1); + // Each segment should have 3 replicas and all assigned instances should be prefixed with consuming + currentAssignment.forEach((type, instanceStateMap) -> { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + instanceStateMap.forEach((instance, state) -> { + if (!instance.startsWith("badInstance_")) { + assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); + } + }); + }); + // Rebalance without COMPLETED instance partitions should not change the segment assignment - Map<InstancePartitionsType, InstancePartitions> noRelocationInstancePartitionsMap = new TreeMap<>(); - noRelocationInstancePartitionsMap - .put(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); + Map<InstancePartitionsType, InstancePartitions> noRelocationInstancePartitionsMap = + ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); assertEquals(_segmentAssignment .rebalanceTable(currentAssignment, noRelocationInstancePartitionsMap, null, null, new BaseConfiguration()), currentAssignment); @@ -162,29 +197,36 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { // instances Map<String, Map<String, String>> newAssignment = _segmentAssignment .rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, new BaseConfiguration()); - assertEquals(newAssignment.size(), NUM_SEGMENTS + 1); + assertEquals(newAssignment.size(), NUM_SEGMENTS + uploadedSegmentNames.size() + 1); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { - // COMPLETED (ONLINE) segments - Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); - for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { - assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); - assertEquals(entry.getValue(), SegmentStateModel.ONLINE); - } + // check COMPLETED (ONLINE) segments + newAssignment.get(_segments.get(segmentId)).forEach((instance, state) -> { + assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); + assertEquals(state, SegmentStateModel.ONLINE); + }); } else { - // CONSUMING segments - Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); - for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { - assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); - assertEquals(entry.getValue(), SegmentStateModel.CONSUMING); - } + // check CONSUMING segments + newAssignment.get(_segments.get(segmentId)).forEach((instance, state) -> { + assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); + assertEquals(state, SegmentStateModel.CONSUMING); + }); } } - // Relocated segments should be balanced (each instance should have at least 28 segments assigned) + // check the uploaded segments + for (String uploadedSegName : uploadedSegmentNames) { + newAssignment.get(uploadedSegName).forEach((instance, state) -> { + assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); + assertEquals(state, SegmentStateModel.ONLINE); + }); + } + + // Relocated segments should be balanced (each instance should have at least 29 segments assigned) int[] numSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, COMPLETED_INSTANCES); assertEquals(numSegmentsAssignedPerInstance.length, NUM_COMPLETED_INSTANCES); - int expectedMinNumSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) * NUM_REPLICAS / NUM_COMPLETED_INSTANCES; + int expectedMinNumSegmentsPerInstance = + (NUM_SEGMENTS - NUM_PARTITIONS) * NUM_REPLICAS / NUM_COMPLETED_INSTANCES + 1; for (int i = 0; i < NUM_COMPLETED_INSTANCES; i++) { assertTrue(numSegmentsAssignedPerInstance[i] >= expectedMinNumSegmentsPerInstance); } @@ -232,6 +274,28 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { } } + @Test + public void testAssignSegmentForUploadedSegments() { + // CONSUMING instance partition has been tested in previous method, only test COMPLETED here + Map<InstancePartitionsType, InstancePartitions> onlyCompletedInstancePartitionMap = + ImmutableMap.of(InstancePartitionsType.COMPLETED, _instancePartitionsMap.get(InstancePartitionsType.COMPLETED)); + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + Map<String, List<String>> expectedUploadedSegmentToInstances = ImmutableMap.of( + "uploadedSegment_0", ImmutableList.of("completedInstance_0", "completedInstance_1", "completedInstance_2"), + "uploadedSegment_1", ImmutableList.of("completedInstance_3", "completedInstance_4", "completedInstance_5"), + "uploadedSegment_2", ImmutableList.of("completedInstance_6", "completedInstance_7", "completedInstance_8"), + "uploadedSegment_3", ImmutableList.of("completedInstance_9", "completedInstance_0", "completedInstance_1"), + "uploadedSegment_4", ImmutableList.of("completedInstance_2", "completedInstance_3", "completedInstance_4") + ); + expectedUploadedSegmentToInstances.forEach((segmentName, expectedInstances) -> { + List<String> actualInstances = + _segmentAssignment.assignSegment(segmentName, currentAssignment, onlyCompletedInstancePartitionMap); + assertEquals(actualInstances, expectedInstances); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(actualInstances, SegmentStateModel.ONLINE)); + }); + } + private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId, List<String> instancesAssigned) { // Change the state of the last segment in the same partition from CONSUMING to ONLINE if exists @@ -246,4 +310,12 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { currentAssignment.put(_segments.get(segmentId), SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); } + + private HelixManager createHelixManager() { + HelixManager helixManager = mock(HelixManager.class); + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + when(propertyStore.get(anyString(), isNull(), anyInt())).thenReturn(new ZNRecord("0")); + return helixManager; + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java index 1724d0aeb8..97aa23aab2 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.helix.core.assignment.segment; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.HashMap; @@ -162,11 +163,13 @@ public class RealtimeNonReplicaGroupTieredSegmentAssignmentTest { @Test public void testRelocateCompletedSegments() { + Map<InstancePartitionsType, InstancePartitions> onlyConsumingInstancePartitionMap = + ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = _segments.get(segmentId); List<String> instancesAssigned = - _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); + _segmentAssignment.assignSegment(segmentName, currentAssignment, onlyConsumingInstancePartitionMap); addToAssignment(currentAssignment, segmentId, instancesAssigned); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java index 09cd07bc9e..ba8929d267 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java @@ -18,12 +18,16 @@ */ package org.apache.pinot.controller.helix.core.assignment.segment; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.commons.configuration.BaseConfiguration; +import org.apache.helix.HelixManager; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.spi.config.table.TableConfig; @@ -36,6 +40,11 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -74,7 +83,7 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) .setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY) .build(); - _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, tableConfig); + _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), tableConfig); _instancePartitionsMap = new TreeMap<>(); // CONSUMING instances: @@ -116,19 +125,16 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { _instancePartitionsMap.put(InstancePartitionsType.COMPLETED, completedInstancePartitions); } - @Test - public void testFactory() { - assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment); - } - @Test public void testAssignSegment() { + Map<InstancePartitionsType, InstancePartitions> onlyConsumingInstancePartitionMap = + ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS; Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = _segments.get(segmentId); List<String> instancesAssigned = - _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); + _segmentAssignment.assignSegment(segmentName, currentAssignment, onlyConsumingInstancePartitionMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); // Segment 0 (partition 0) should be assigned to instance 0, 3, 6 @@ -151,11 +157,13 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { @Test public void testRelocateCompletedSegments() { + Map<InstancePartitionsType, InstancePartitions> onlyConsumingInstancePartitionMap = + ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = _segments.get(segmentId); List<String> instancesAssigned = - _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); + _segmentAssignment.assignSegment(segmentName, currentAssignment, onlyConsumingInstancePartitionMap); addToAssignment(currentAssignment, segmentId, instancesAssigned); } @@ -173,10 +181,31 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { SegmentStateModel.OFFLINE); currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); + // Add 3 uploaded ONLINE segments to the consuming instances (i.e. no separation between consuming & completed) + List<String> uploadedSegmentNames = ImmutableList.of("UploadedSegment0", "UploadedSegment1", "UploadedSegment2"); + onlyConsumingInstancePartitionMap = + ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); + for (String uploadedSegName : uploadedSegmentNames) { + List<String> instancesAssigned = + _segmentAssignment.assignSegment(uploadedSegName, currentAssignment, onlyConsumingInstancePartitionMap); + currentAssignment.put(uploadedSegName, + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); + } + + assertEquals(currentAssignment.size(), NUM_SEGMENTS + uploadedSegmentNames.size() + 1); + // Each segment should have 3 replicas and all assigned instances should be prefixed with consuming + currentAssignment.forEach((type, instanceStateMap) -> { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + instanceStateMap.forEach((instance, state) -> { + if (!instance.startsWith("badInstance_")) { + assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); + } + }); + }); + // Rebalance without COMPLETED instance partitions should not change the segment assignment - Map<InstancePartitionsType, InstancePartitions> noRelocationInstancePartitionsMap = new TreeMap<>(); - noRelocationInstancePartitionsMap - .put(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); + Map<InstancePartitionsType, InstancePartitions> noRelocationInstancePartitionsMap = + ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); assertEquals(_segmentAssignment .rebalanceTable(currentAssignment, noRelocationInstancePartitionsMap, null, null, new BaseConfiguration()), currentAssignment); @@ -185,31 +214,38 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { // instances Map<String, Map<String, String>> newAssignment = _segmentAssignment .rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, new BaseConfiguration()); - assertEquals(newAssignment.size(), NUM_SEGMENTS + 1); + assertEquals(newAssignment.size(), NUM_SEGMENTS + uploadedSegmentNames.size() + 1); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { - // COMPLETED (ONLINE) segments - Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); - for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { - assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); - assertEquals(entry.getValue(), SegmentStateModel.ONLINE); - } + // check COMPLETED (ONLINE) segments + newAssignment.get(_segments.get(segmentId)).forEach((instance, state) -> { + assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); + assertEquals(state, SegmentStateModel.ONLINE); + }); } else { - // CONSUMING segments - Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); - for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { - assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); - assertEquals(entry.getValue(), SegmentStateModel.CONSUMING); - } + // check CONSUMING segments + newAssignment.get(_segments.get(segmentId)).forEach((instance, state) -> { + assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); + assertEquals(state, SegmentStateModel.CONSUMING); + }); } } - // Relocated segments should be balanced (each instance should have 24 segments assigned) + // check the uploaded segments + for (String uploadedSegName : uploadedSegmentNames) { + newAssignment.get(uploadedSegName).forEach((instance, state) -> { + assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); + assertEquals(state, SegmentStateModel.ONLINE); + }); + } + + // Relocated segments should be balanced int[] numSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, COMPLETED_INSTANCES); - int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_COMPLETED_INSTANCES]; - int numSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) * NUM_REPLICAS / NUM_COMPLETED_INSTANCES; - Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); - assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + int expectedNumSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) * NUM_REPLICAS / NUM_COMPLETED_INSTANCES; + for (int actualNumSegments : numSegmentsAssignedPerInstance) { + assertTrue(actualNumSegments == expectedNumSegmentsPerInstance + || actualNumSegments == expectedNumSegmentsPerInstance + 1); + } // Rebalance with COMPLETED instance partitions including CONSUMING segments should give the same assignment BaseConfiguration rebalanceConfig = new BaseConfiguration(); @@ -256,6 +292,34 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { } } + @Test + public void testAssignSegmentForUploadedSegments() { + // CONSUMING instance partition has been tested in previous method, only test COMPLETED here + Map<InstancePartitionsType, InstancePartitions> onlyCompletedInstancePartitionMap = + ImmutableMap.of(InstancePartitionsType.COMPLETED, _instancePartitionsMap.get(InstancePartitionsType.COMPLETED)); + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + // COMPLETED instances: + // { + // 0_0=[instance_0, instance_1, instance_2, instance_3], + // 0_1=[instance_4, instance_5, instance_6, instance_7], + // 0_2=[instance_8, instance_9, instance_10, instance_11] + // } + Map<String, List<String>> expectedUploadedSegmentToInstances = ImmutableMap.of( + "uploadedSegment_0", ImmutableList.of("completedInstance_0", "completedInstance_4", "completedInstance_8"), + "uploadedSegment_1", ImmutableList.of("completedInstance_1", "completedInstance_5", "completedInstance_9"), + "uploadedSegment_2", ImmutableList.of("completedInstance_2", "completedInstance_6", "completedInstance_10"), + "uploadedSegment_3", ImmutableList.of("completedInstance_3", "completedInstance_7", "completedInstance_11"), + "uploadedSegment_4", ImmutableList.of("completedInstance_0", "completedInstance_4", "completedInstance_8") + ); + expectedUploadedSegmentToInstances.forEach((segmentName, expectedInstances) -> { + List<String> actualInstances = + _segmentAssignment.assignSegment(segmentName, currentAssignment, onlyCompletedInstancePartitionMap); + assertEquals(actualInstances, expectedInstances); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(actualInstances, SegmentStateModel.ONLINE)); + }); + } + private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId, List<String> instancesAssigned) { // Change the state of the last segment in the same partition from CONSUMING to ONLINE if exists @@ -270,4 +334,12 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { currentAssignment.put(_segments.get(segmentId), SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); } + + private HelixManager createHelixManager() { + HelixManager helixManager = mock(HelixManager.class); + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + when(propertyStore.get(anyString(), isNull(), anyInt())).thenReturn(new ZNRecord("0")); + return helixManager; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 6276db12c0..5b28f04fd3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -189,7 +189,8 @@ public abstract class BaseTableDataManager implements TableDataManager { @Override public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig) throws Exception { - throw new UnsupportedOperationException(); + Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); + addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema)); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java index e3cf839b61..7e17da42cb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java @@ -18,13 +18,8 @@ */ package org.apache.pinot.core.data.manager.offline; -import java.io.File; import javax.annotation.concurrent.ThreadSafe; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.core.data.manager.BaseTableDataManager; -import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; -import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; -import org.apache.pinot.spi.data.Schema; /** @@ -44,11 +39,4 @@ public class OfflineTableDataManager extends BaseTableDataManager { @Override protected void doShutdown() { } - - @Override - public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig) - throws Exception { - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema)); - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 730bb81752..198023632c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -369,8 +369,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private void handleUpsert(ImmutableSegmentImpl immutableSegment) { String segmentName = immutableSegment.getSegmentName(); - int partitionGroupId = SegmentUtils + Integer partitionGroupId = SegmentUtils .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0)); + Preconditions.checkNotNull(partitionGroupId, String + .format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName, + _tableNameWithType)); PartitionUpsertMetadataManager partitionUpsertMetadataManager = _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId); ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index 864f6b7287..7fb120cfb7 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -21,16 +21,24 @@ package org.apache.pinot.integration.tests; import com.fasterxml.jackson.databind.JsonNode; import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.io.FileUtils; +import org.apache.http.HttpStatus; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.ReadMode; @@ -42,6 +50,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -103,6 +112,85 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio } } + @Override + protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema, TableConfig tableConfig) + throws Exception { + if (!_tarDir.exists()) { + _tarDir.mkdir(); + } + if (!_segmentDir.exists()) { + _segmentDir.mkdir(); + } + + // create segments out of the avro files (segments will be placed in _tarDir) + List<File> copyOfAvroFiles = new ArrayList<>(avroFiles); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(copyOfAvroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + + // upload segments to controller + uploadSegmentsToController(getTableName(), _tarDir, false, false); + + // upload the first segment again to verify refresh + uploadSegmentsToController(getTableName(), _tarDir, true, false); + + // upload the first segment again to verify refresh with different segment crc + uploadSegmentsToController(getTableName(), _tarDir, true, true); + + // add avro files to the original list so H2 will have the uploaded data as well + avroFiles.addAll(copyOfAvroFiles); + } + + private void uploadSegmentsToController(String tableName, File tarDir, boolean onlyFirstSegment, boolean changeCrc) + throws Exception { + File[] segmentTarFiles = tarDir.listFiles(); + assertNotNull(segmentTarFiles); + int numSegments = segmentTarFiles.length; + assertTrue(numSegments > 0); + if (onlyFirstSegment) { + numSegments = 1; + } + URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort); + try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { + if (numSegments == 1) { + File segmentTarFile = segmentTarFiles[0]; + if (changeCrc) { + changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString()); + } + assertEquals(fileUploadDownloadClient + .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName, + TableType.REALTIME).getStatusCode(), HttpStatus.SC_OK); + } else { + // Upload segments in parallel + ExecutorService executorService = Executors.newFixedThreadPool(numSegments); + List<Future<Integer>> futures = new ArrayList<>(numSegments); + for (File segmentTarFile : segmentTarFiles) { + futures.add(executorService.submit(() -> fileUploadDownloadClient + .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName, + TableType.REALTIME).getStatusCode())); + } + executorService.shutdown(); + for (Future<Integer> future : futures) { + assertEquals((int) future.get(), HttpStatus.SC_OK); + } + } + } + } + + private void changeCrcInSegmentZKMetadata(String tableName, String segmentFilePath) { + int startIdx = segmentFilePath.indexOf("mytable_"); + int endIdx = segmentFilePath.indexOf(".tar.gz"); + String segmentName = segmentFilePath.substring(startIdx, endIdx); + String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); + SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName); + segmentZKMetadata.setCrc(111L); + _helixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata); + } + + @Override + protected long getCountStarResult() { + // all the data that was ingested from Kafka also got uploaded via the controller's upload endpoint + return super.getCountStarResult() * 2; + } + @BeforeClass @Override public void setUp() @@ -138,9 +226,11 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName()); List<SegmentZKMetadata> segmentsZKMetadata = ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, realtimeTableName); - for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) { - assertEquals(segmentZKMetadata.getSizeThresholdToFlushSegment(), - getRealtimeSegmentFlushSize() / getNumKafkaPartitions()); + for (SegmentZKMetadata segMetadata : segmentsZKMetadata) { + if (segMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.UPLOADED) { + assertEquals(segMetadata.getSizeThresholdToFlushSegment(), + getRealtimeSegmentFlushSize() / getNumKafkaPartitions()); + } } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java index 6208c36c5b..dfa7a34d82 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java @@ -23,6 +23,8 @@ import java.util.Arrays; import java.util.List; import java.util.Random; import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -55,12 +57,17 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe List<File> avroFiles = unpackAvroData(_tempDir); // Create and upload the schema and table config - addSchema(createSchema()); - addTableConfig(createRealtimeTableConfig(avroFiles.get(0))); + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0)); + addTableConfig(tableConfig); // Push data into Kafka pushAvroIntoKafka(avroFiles); + // create segments and upload them to controller + createSegmentsAndUpload(avroFiles, schema, tableConfig); + // Set up the H2 connection setUpH2Connection(avroFiles); @@ -71,6 +78,11 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe waitForAllDocsLoaded(600_000L); } + protected void createSegmentsAndUpload(List<File> avroFile, Schema schema, TableConfig tableConfig) + throws Exception { + // Do nothing. This is specific to LLC use cases for now. + } + @Override protected void overrideServerConf(PinotConfiguration configuration) { configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, false); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org