This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 6502a7d29214cc1ad3bdf9feba169192d1c50ff7 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Wed Dec 23 17:08:08 2020 -0800 Controller side changes pseudo code --- .../helix/core/PinotHelixResourceManager.java | 29 ++++++++ .../realtime/PinotLLCRealtimeSegmentManager.java | 79 ++++++++++++++++++++++ .../org/apache/pinot/spi/stream/Checkpoint.java | 24 +++++++ .../org/apache/pinot/spi/stream/FetchResult.java | 27 ++++++++ .../pinot/spi/stream/PartitionGroupConsumer.java | 23 +++++++ .../pinot/spi/stream/PartitionGroupMetadata.java | 41 +++++++++++ .../spi/stream/PartitionGroupMetadataList.java | 30 ++++++++ .../org/apache/pinot/spi/stream/StreamConfig.java | 6 +- .../pinot/spi/stream/StreamConsumerFactory.java | 9 ++- 9 files changed, 266 insertions(+), 2 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ebf07d8..fa117fa 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -88,6 +88,7 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.CommonConstants.Helix; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; @@ -125,7 +126,10 @@ import org.apache.pinot.spi.config.table.TenantConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -1350,6 +1354,10 @@ public class PinotHelixResourceManager { IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig)); IdealState idealState = getTableIdealState(realtimeTableName); + if (streamConfig.isShardedConsumerType()) { + setupShardedRealtimeTable(streamConfig, idealState, realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber()); + } + if (streamConfig.hasHighLevelConsumerType()) { if (idealState == null) { LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName); @@ -1382,6 +1390,27 @@ public class PinotHelixResourceManager { } } + /** + * Sets up the realtime table ideal state + * @param streamConfig + */ + private void setupShardedRealtimeTable(StreamConfig streamConfig, IdealState idealState, int numReplicas) { + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + + // get current partition groups and their metadata - this will be empty when creating the table + List<PartitionGroupMetadata> currentPartitionGroupMetadataList = _pinotLLCRealtimeSegmentManager.getCurrentPartitionGroupMetadataList(idealState); + + // get new partition groups and their metadata, + // Assume table has 3 shards. Say we get [0], [1], [2] groups (for now assume that each group contains only 1 shard) + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList); + + // setup segment zk metadata and ideal state for all the new found partition groups + _pinotLLCRealtimeSegmentManager.setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas); + } + + + private void ensurePropertyStoreEntryExistsForHighLevelConsumer(String realtimeTableName) { String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForResource(realtimeTableName); if (!_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 42d50d0..8a29489 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -45,6 +45,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; @@ -75,10 +76,12 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; import org.apache.pinot.spi.stream.PartitionOffsetFetcher; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; @@ -157,6 +160,82 @@ public class PinotLLCRealtimeSegmentManager { _flushThresholdUpdateManager = new FlushThresholdUpdateManager(); } + /** + * The committing segment will call this. + * + * For example, say we have 3 shards, grouped into PartitionGroups as [0], [1], [2] + * Now segment of PG (partition group) 0 is committing. First, we'll update the metadata to DONE, and ideal state to ONLINE + * Then, the currentPartitionGroupMetadata list will contain - [1], [2] + * The newPartitionGroupMetadata list will contain - [0], [1], [2] + * We then get the set of PGs for which new segments need to be made - [0] + */ + public void commitPartitionGroup(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) { + TableConfig realtimeTableConfig = getTableConfig(realtimeTableName); + StreamConfig streamConfig = new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig)); + int numReplicas = realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber(); + IdealState idealState = getIdealState(realtimeTableName); + + // update status in segment metadata to DONE + // .. + + // update Ideal State for this segment to ONLINE + // .. + + // fetch current partition groups (which are actively CONSUMING - from example above, [1], [2]) + List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); + + // get new partition groups (honor any groupings which are already consuming - [0], [1], [2]) + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList); + + // from the above list, remove the partition groups which are already CONSUMING + // i.e. newPartitionGroups - currentPartitionGroups. Therefore, ([0], [1], [2]) - ([1], [2]) = ([0]) + // .. + + // setup segment metadata and ideal state for the new found partition groups + setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas); + } + + public void setupIdealStateForConsuming(List<SegmentZKMetadata> segmentZKMetadata, int numReplicas) { + // add all segments from the list to ideal state, with state CONSUMING + } + + public void persistSegmentMetadata(List<SegmentZKMetadata> segmentMetadata) { + // persist new segment metadata from list to zk + } + + /** + * Using the list of partition group metadata, create a list of equivalent segment zk metadata + */ + public List<SegmentZKMetadata> constructSegmentMetadata(List<PartitionGroupMetadata> partitionGroupMetadataList) { + List<SegmentZKMetadata> segmentZKMetadata = new ArrayList<>(); + // for each partition group construct a segment zk metadata object + return segmentZKMetadata; + } + + /** + * Using the ideal state, return a list of the current partition groups + */ + public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) { + List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(); + // from all segment names in the ideal state, find unique groups + + // create a PartitionGroupMetadata, one for each group + return partitionGroupMetadataList; + } + + public void setupNewPartitionGroups(List<PartitionGroupMetadata> newPartitionGroupMetadataList, int numReplicas) { + // construct segment zk metadata for the new partition groups + List<SegmentZKMetadata> segmentMetadata = constructSegmentMetadata(newPartitionGroupMetadataList); + + // create these new segments metadata + persistSegmentMetadata(segmentMetadata); + + // setup ideal state for the new segments + setupIdealStateForConsuming(segmentMetadata, numReplicas); + } + public boolean getIsSplitCommitEnabled() { return _controllerConf.getAcceptSplitCommit(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java new file mode 100644 index 0000000..627c964 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +public interface Checkpoint { + byte[] serialize(); + Checkpoint deserialize(byte[] blob); +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java new file mode 100644 index 0000000..b0ed6e5 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +import java.util.List; + + +public interface FetchResult<T> { + Checkpoint getLastCheckpoint(); + List<T> getMessages(); +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java new file mode 100644 index 0000000..2f138c2 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +public interface PartitionGroupConsumer { + FetchResult fetch(Checkpoint start, Checkpoint end, long timeout); +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java new file mode 100644 index 0000000..779c167 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +import java.util.List; + + +public interface PartitionGroupMetadata { + + int getGroupId(); + + List<String> getPartitions(); + + Checkpoint getStartCheckpoint(); // similar to getStartOffset + + Checkpoint getEndCheckpoint(); // similar to getEndOffset + + void setStartCheckpoint(Checkpoint startCheckpoint); + + void setEndCheckpoint(Checkpoint endCheckpoint); + + byte[] serialize(); + + PartitionGroupMetadata deserialize(byte[] blob); +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java new file mode 100644 index 0000000..1568d63 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +import java.util.List; + + +public interface PartitionGroupMetadataList { + + List<PartitionGroupMetadata> getMetadataList(); + + PartitionGroupMetadata getPartitionGroupMetadata(int index); + +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index d343203..a3e359e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -41,7 +41,7 @@ public class StreamConfig { * The type of the stream consumer either HIGHLEVEL or LOWLEVEL. For backward compatibility, adding SIMPLE which is equivalent to LOWLEVEL */ public enum ConsumerType { - HIGHLEVEL, LOWLEVEL + HIGHLEVEL, LOWLEVEL, SHARDED } public static final int DEFAULT_FLUSH_THRESHOLD_ROWS = 5_000_000; @@ -273,6 +273,10 @@ public class StreamConfig { return _consumerTypes.contains(ConsumerType.LOWLEVEL); } + public boolean isShardedConsumerType() { + return _consumerTypes.size() == 1 && _consumerTypes.get(0).equals(ConsumerType.SHARDED); + } + public String getConsumerFactoryClassName() { return _consumerFactoryClassName; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index 27205c9..4db0fb1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -18,8 +18,8 @@ */ package org.apache.pinot.spi.stream; +import java.util.List; import java.util.Set; -import org.apache.pinot.spi.data.Schema; /** @@ -73,4 +73,11 @@ public abstract class StreamConsumerFactory { public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() { return new LongMsgOffsetFactory(); } + + // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state + public abstract List<PartitionGroupMetadata> getPartitionGroupMetadataList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata); + + // creates a consumer which consumes from a partition group + public abstract PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org