This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 96621f415385649b4f2ec5ea1828723eed4baa7b Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Thu Dec 31 12:04:54 2020 -0800 Controller side code --- .../segment/RealtimeSegmentZKMetadata.java | 6 - .../helix/core/PinotHelixResourceManager.java | 88 ++++----- .../helix/core/PinotTableIdealStateBuilder.java | 9 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 201 +++++++++++++-------- .../fakestream/FakePartitionGroupMetadata.java | 48 ----- .../kafka09/KafkaPartitionGroupMetadata.java | 48 ----- .../kafka20/KafkaPartitionGroupMetadata.java | 48 ----- .../pinot/spi/stream/PartitionGroupMetadata.java | 52 +++++- 8 files changed, 207 insertions(+), 293 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java index c46af53..d88be18 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java @@ -35,7 +35,6 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata { private Status _status = null; private int _sizeThresholdToFlushSegment = -1; private String _timeThresholdToFlushSegment = null; // store as period string for readability - private String _partitionGroupMetadataStr = null; public RealtimeSegmentZKMetadata() { setSegmentType(SegmentType.REALTIME); @@ -50,7 +49,6 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata { if (flushThresholdTime != null && !flushThresholdTime.equals(NULL)) { _timeThresholdToFlushSegment = znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_TIME); } - _partitionGroupMetadataStr = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_GROUP_METADATA); } @Override @@ -143,8 +141,4 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata { public void setTimeThresholdToFlushSegment(String timeThresholdPeriodString) { _timeThresholdToFlushSegment = timeThresholdPeriodString; } - - public String getPartitionGroupMetadataStr() { - return _partitionGroupMetadataStr; - } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index a04e0bc..1f36e4f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -127,6 +127,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; @@ -1355,65 +1356,50 @@ public class PinotHelixResourceManager { IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig)); IdealState idealState = getTableIdealState(realtimeTableName); + if (streamConfig.isShardedConsumerType()) { - setupShardedRealtimeTable(streamConfig, idealState, realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber()); - } + idealState = PinotTableIdealStateBuilder + .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, + _enableBatchMessageMode); + _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); + LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); + _pinotLLCRealtimeSegmentManager.setupNewShardedTable(rawRealtimeTableConfig, idealState); + } else { - if (streamConfig.hasHighLevelConsumerType()) { - if (idealState == null) { - LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName); - idealState = PinotTableIdealStateBuilder - .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager, - _propertyStore, _enableBatchMessageMode); - _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState); - } else { - // Remove LLC segments if it is not configured - if (!streamConfig.hasLowLevelConsumerType()) { - _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState); + if (streamConfig.hasHighLevelConsumerType()) { + if (idealState == null) { + LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName); + idealState = PinotTableIdealStateBuilder + .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager, + _propertyStore, _enableBatchMessageMode); + _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState); + } else { + // Remove LLC segments if it is not configured + if (!streamConfig.hasLowLevelConsumerType()) { + _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState); + } } + // For HLC table, property store entry must exist to trigger watchers to create segments + ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName); } - // For HLC table, property store entry must exist to trigger watchers to create segments - ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName); - } - - // Either we have only low-level consumer, or both. - if (streamConfig.hasLowLevelConsumerType()) { - // Will either create idealstate entry, or update the IS entry with new segments - // (unless there are low-level segments already present) - if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) { - PinotTableIdealStateBuilder - .buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager, realtimeTableName, realtimeTableConfig, - idealState, _enableBatchMessageMode); - LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); - } else { - LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName); + + // Either we have only low-level consumer, or both. + if (streamConfig.hasLowLevelConsumerType()) { + // Will either create idealstate entry, or update the IS entry with new segments + // (unless there are low-level segments already present) + if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) { + idealState = PinotTableIdealStateBuilder + .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, + _enableBatchMessageMode); + _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); + LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); + } else { + LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName); + } } } } - /** - * Sets up the realtime table ideal state - * @param streamConfig - */ - private void setupShardedRealtimeTable(StreamConfig streamConfig, IdealState idealState, int numReplicas) { - StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); - StreamMetadataProvider streamMetadataProvider = streamConsumerFactory - .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis()); - - // get current partition groups and their metadata - this will be empty when creating the table - List<PartitionGroupMetadata> currentPartitionGroupMetadataList = _pinotLLCRealtimeSegmentManager.getCurrentPartitionGroupMetadataList(idealState); - - // get new partition groups and their metadata, - // Assume table has 3 shards. Say we get [0], [1], [2] groups (for now assume that each group contains only 1 shard) - List<PartitionGroupMetadata> newPartitionGroupMetadataList = - streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 5000); - - // setup segment zk metadata and ideal state for all the new found partition groups - _pinotLLCRealtimeSegmentManager.setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas); - } - - - private void ensurePropertyStoreEntryExistsForHighLevelConsumer(String realtimeTableName) { String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForResource(realtimeTableName); if (!_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 1e95966..a7b3c9e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -94,9 +94,8 @@ public class PinotTableIdealStateBuilder { return idealState; } - public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, - String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState, - boolean enableBatchMessageMode) { + public static IdealState buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig, + IdealState idealState, boolean enableBatchMessageMode) { // Validate replicasPerPartition here. final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition(); @@ -105,7 +104,7 @@ public class PinotTableIdealStateBuilder { } final int nReplicas; try { - nReplicas = Integer.valueOf(replicasPerPartitionStr); + nReplicas = Integer.parseInt(replicasPerPartitionStr); } catch (NumberFormatException e) { throw new PinotHelixResourceManager.InvalidTableConfigException( "Invalid value for replicasPerPartition, expected a number: " + replicasPerPartitionStr, e); @@ -113,7 +112,7 @@ public class PinotTableIdealStateBuilder { if (idealState == null) { idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode); } - pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); + return idealState; } public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 189be8b..9b03fa4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.helix.AccessOption; import org.apache.helix.HelixAdmin; @@ -44,6 +45,7 @@ import org.apache.pinot.common.assignment.InstancePartitionsUtils; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; +import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMeter; @@ -161,82 +163,84 @@ public class PinotLLCRealtimeSegmentManager { _flushThresholdUpdateManager = new FlushThresholdUpdateManager(); } + /** - * The committing segment will call this. - * - * For example, say we have 3 shards, grouped into PartitionGroups as [0], [1], [2] - * Now segment of PG (partition group) 0 is committing. First, we'll update the metadata to DONE, and ideal state to ONLINE - * Then, the currentPartitionGroupMetadata list will contain - [1], [2] - * The newPartitionGroupMetadata list will contain - [0], [1], [2] - * We then get the set of PGs for which new segments need to be made - [0] + * Using the ideal state and segment metadata, return a list of the current partition groups */ - public void commitPartitionGroup(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) { - TableConfig realtimeTableConfig = getTableConfig(realtimeTableName); - StreamConfig streamConfig = new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig)); - int numReplicas = realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber(); - IdealState idealState = getIdealState(realtimeTableName); - - // update status in segment metadata to DONE - // .. - - // update Ideal State for this segment to ONLINE - // .. - - // fetch current partition groups (which are actively CONSUMING - from example above, [1], [2]) - List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); - - // get new partition groups (honor any groupings which are already consuming - [0], [1], [2]) - StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); - StreamMetadataProvider streamMetadataProvider = streamConsumerFactory - .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis()); - List<PartitionGroupMetadata> newPartitionGroupMetadataList = - streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000); - - // from the above list, remove the partition groups which are already CONSUMING - // i.e. newPartitionGroups - currentPartitionGroups. Therefore, ([0], [1], [2]) - ([1], [2]) = ([0]) - // .. - - // setup segment metadata and ideal state for the new found partition groups - setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas); - } + public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) { + List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(); - public void setupIdealStateForConsuming(List<SegmentZKMetadata> segmentZKMetadata, int numReplicas) { - // add all segments from the list to ideal state, with state CONSUMING - } + // from all segment names in the ideal state, find unique groups + Map<Integer, LLCSegmentName> groupIdToLatestSegment = new HashMap<>(); + for (String segment : idealState.getPartitionSet()) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segment); + int partitionGroupId = llcSegmentName.getPartitionGroupId(); + groupIdToLatestSegment.compute(partitionGroupId, (k, latestSegment) -> { + if (latestSegment == null) { + return llcSegmentName; + } else { + return latestSegment.getSequenceNumber() > llcSegmentName.getSequenceNumber() ? latestSegment + : llcSegmentName; + } + }); + } - public void persistSegmentMetadata(List<SegmentZKMetadata> segmentMetadata) { - // persist new segment metadata from list to zk + // create a PartitionGroupMetadata for each latest segment + for (Map.Entry<Integer, LLCSegmentName> entry : groupIdToLatestSegment.entrySet()) { + int partitionGroupId = entry.getKey(); + LLCSegmentName llcSegmentName = entry.getValue(); + RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = ZKMetadataProvider + .getRealtimeSegmentZKMetadata(_propertyStore, llcSegmentName.getTableName(), llcSegmentName.getSegmentName()); + Preconditions.checkNotNull(realtimeSegmentZKMetadata); + LLCRealtimeSegmentZKMetadata llRealtimeSegmentZKMetadata = + (LLCRealtimeSegmentZKMetadata) realtimeSegmentZKMetadata; + PartitionGroupMetadata partitionGroupMetadata = + new PartitionGroupMetadata(partitionGroupId, llcSegmentName.getSequenceNumber(), + llRealtimeSegmentZKMetadata.getStartOffset(), llRealtimeSegmentZKMetadata.getEndOffset(), + llRealtimeSegmentZKMetadata.getStatus().toString()); + partitionGroupMetadataList.add(partitionGroupMetadata); + } + return partitionGroupMetadataList; } /** - * Using the list of partition group metadata, create a list of equivalent segment zk metadata + * Sets up the realtime table ideal state for a table of consumer type SHARDED */ - public List<SegmentZKMetadata> constructSegmentMetadata(List<PartitionGroupMetadata> partitionGroupMetadataList) { - List<SegmentZKMetadata> segmentZKMetadata = new ArrayList<>(); - // for each partition group construct a segment zk metadata object - return segmentZKMetadata; - } + public void setupNewShardedTable(TableConfig tableConfig, IdealState idealState) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); - /** - * Using the ideal state, return a list of the current partition groups - */ - public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) { - List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(); - // from all segment names in the ideal state, find unique groups + String realtimeTableName = tableConfig.getTableName(); + LOGGER.info("Setting up new SHARDED table: {}", realtimeTableName); - // create a PartitionGroupMetadata, one for each group - return partitionGroupMetadataList; - } + _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName); - public void setupNewPartitionGroups(List<PartitionGroupMetadata> newPartitionGroupMetadataList, int numReplicas) { - // construct segment zk metadata for the new partition groups - List<SegmentZKMetadata> segmentMetadata = constructSegmentMetadata(newPartitionGroupMetadataList); + PartitionLevelStreamConfig streamConfig = + new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + + // get new partition groups and their metadata + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + StreamMetadataProvider streamMetadataProvider = streamConsumerFactory + .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis()); + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000); + int numPartitionGroups = newPartitionGroupMetadataList.size(); + + InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); + int numReplicas = getNumReplicas(tableConfig, instancePartitions); - // create these new segments metadata - persistSegmentMetadata(segmentMetadata); + SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); - // setup ideal state for the new segments - setupIdealStateForConsuming(segmentMetadata, numReplicas); + long currentTimeMs = getCurrentTimeMs(); + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { + String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata.getPartitionGroupId(), + currentTimeMs, instancePartitions, numPartitionGroups, numReplicas); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, + instancePartitionsMap); + } + setIdealState(realtimeTableName, idealState); } public boolean getIsSplitCommitEnabled() { @@ -532,13 +536,50 @@ public class PinotLLCRealtimeSegmentManager { _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true); // Step-2 + + // Say we currently were consuming from 3 shards A, B, C. Of those, A is the one committing. Also suppose that new partition D has come up + // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS] + List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); + StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + StreamMetadataProvider streamMetadataProvider = streamConsumerFactory + .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis()); + // find new partition groups [A],[B],[C],[D] + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000); + + // create new segment metadata, only if it is not IN_PROGRESS in the current state + Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect( + Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p)); + + List<String> newConsumingSegmentNames = new ArrayList<>(); + String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); - LLCSegmentName newLLCSegmentName = - getNextLLCSegmentName(new LLCSegmentName(committingSegmentName), newSegmentCreationTimeMs); - createNewSegmentZKMetadata(tableConfig, - new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)), - newLLCSegmentName, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, - instancePartitions, numPartitions, numReplicas); + for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { + int newPartitionGroupId = partitionGroupMetadata.getPartitionGroupId(); + PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId); + if (currentPartitionGroupMetadata == null) { // not present in current state + // make new segment + LLCSegmentName newLLCSegmentName = + new LLCSegmentName(rawTableName, newPartitionGroupId, STARTING_SEQUENCE_NUMBER, newSegmentCreationTimeMs); + createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs, + committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); + newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName()); + } else { + String currentStatus = currentPartitionGroupMetadata.getStatus(); + if (!currentStatus.equals(Status.IN_PROGRESS.toString())) { // not IN_PROGRESS anymore in current state + // make new segment + LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId, + currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs); + createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs, + committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); + newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName()); + } + } + } + // Step-3 SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); @@ -554,7 +595,7 @@ public class PinotLLCRealtimeSegmentManager { Lock lock = _idealStateUpdateLocks[lockIndex]; try { lock.lock(); - updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newLLCSegmentName.getSegmentName(), + updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentNames, segmentAssignment, instancePartitionsMap); } finally { lock.unlock(); @@ -845,7 +886,7 @@ public class PinotLLCRealtimeSegmentManager { */ @VisibleForTesting void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, - String newSegmentName, SegmentAssignment segmentAssignment, + List<String> newSegmentNames, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { assert idealState != null; @@ -862,14 +903,18 @@ public class PinotLLCRealtimeSegmentManager { "Exceeded max segment completion time for segment " + committingSegmentName); } updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName, - newSegmentName, segmentAssignment, instancePartitionsMap); + null, segmentAssignment, instancePartitionsMap); + for (String newSegmentName : newSegmentNames) { + updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), null, + newSegmentName, segmentAssignment, instancePartitionsMap); + } return idealState; }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f)); } @VisibleForTesting void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap, - @Nullable String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, + @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { if (committingSegmentName != null) { // Change committing segment state to ONLINE @@ -899,11 +944,11 @@ public class PinotLLCRealtimeSegmentManager { } } // Assign instances to the new segment and add instances as state CONSUMING - List<String> instancesAssigned = - segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); - instanceStatesMap.put(newSegmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); - LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); + if (newSegmentName != null) { + List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); + instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); + LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); + } } /* diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java deleted file mode 100644 index 78ee12c..0000000 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.apache.pinot.core.realtime.impl.fakestream; - -import org.apache.pinot.spi.stream.Checkpoint; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; - - -public class FakePartitionGroupMetadata implements PartitionGroupMetadata { - - private final int _groupId; - public FakePartitionGroupMetadata(int groupId) { - _groupId = groupId; - } - - @Override - public int getGroupId() { - return getGroupId(); - } - - @Override - public Checkpoint getStartCheckpoint() { - return null; - } - - @Override - public Checkpoint getEndCheckpoint() { - return null; - } - - @Override - public void setStartCheckpoint(Checkpoint startCheckpoint) { - - } - - @Override - public void setEndCheckpoint(Checkpoint endCheckpoint) { - - } - - @Override - public byte[] serialize() { - return new byte[0]; - } - - @Override - public PartitionGroupMetadata deserialize(byte[] blob) { - return null; - } -} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java deleted file mode 100644 index 1d792ac..0000000 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.apache.pinot.plugin.stream.kafka09; - -import org.apache.pinot.spi.stream.Checkpoint; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; - - -public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata { - - private final int _groupId; - public KafkaPartitionGroupMetadata(int partitionId) { - _groupId = partitionId; - } - - @Override - public int getGroupId() { - return _groupId; - } - - @Override - public Checkpoint getStartCheckpoint() { - return null; - } - - @Override - public Checkpoint getEndCheckpoint() { - return null; - } - - @Override - public void setStartCheckpoint(Checkpoint startCheckpoint) { - - } - - @Override - public void setEndCheckpoint(Checkpoint endCheckpoint) { - - } - - @Override - public byte[] serialize() { - return new byte[0]; - } - - @Override - public PartitionGroupMetadata deserialize(byte[] blob) { - return null; - } -} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java deleted file mode 100644 index 31ae75a..0000000 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.apache.pinot.plugin.stream.kafka20; - -import org.apache.pinot.spi.stream.Checkpoint; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; - - -public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata { - - private final int _groupId; - public KafkaPartitionGroupMetadata(int partitionId) { - _groupId = partitionId; - } - - @Override - public int getGroupId() { - return _groupId; - } - - @Override - public Checkpoint getStartCheckpoint() { - return null; - } - - @Override - public Checkpoint getEndCheckpoint() { - return null; - } - - @Override - public void setStartCheckpoint(Checkpoint startCheckpoint) { - - } - - @Override - public void setEndCheckpoint(Checkpoint endCheckpoint) { - - } - - @Override - public byte[] serialize() { - return new byte[0]; - } - - @Override - public PartitionGroupMetadata deserialize(byte[] blob) { - return null; - } -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java index 0f44173..f662d99 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java @@ -18,22 +18,56 @@ */ package org.apache.pinot.spi.stream; -import java.util.List; +public class PartitionGroupMetadata { + // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider) + private final int _partitionGroupId; + private int _sequenceNumber; + private String _startCheckpoint; + private String _endCheckpoint; + private String _status; -public interface PartitionGroupMetadata { + public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, String startCheckpoint, + String endCheckpoint, String status) { + _partitionGroupId = partitionGroupId; + _sequenceNumber = sequenceNumber; + _startCheckpoint = startCheckpoint; + _endCheckpoint = endCheckpoint; + } - int getGroupId(); + public void setSequenceNumber(int sequenceNumber) { + _sequenceNumber = sequenceNumber; + } - Checkpoint getStartCheckpoint(); // similar to getStartOffset + public void setStartCheckpoint(String startCheckpoint) { + _startCheckpoint = startCheckpoint; + } - Checkpoint getEndCheckpoint(); // similar to getEndOffset + public void setEndCheckpoint(String endCheckpoint) { + _endCheckpoint = endCheckpoint; + } - void setStartCheckpoint(Checkpoint startCheckpoint); + public int getPartitionGroupId() { + return _partitionGroupId; + } - void setEndCheckpoint(Checkpoint endCheckpoint); + public int getSequenceNumber() { + return _sequenceNumber; + } - byte[] serialize(); + public String getStartCheckpoint() { + return _startCheckpoint; + } - PartitionGroupMetadata deserialize(byte[] blob); + public String getEndCheckpoint() { + return _endCheckpoint; + } + + public String getStatus() { + return _status; + } + + public void setStatus(String status) { + _status = status; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org