This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 099a86cff0 Add schema as input to the decoder. (#12981) 099a86cff0 is described below commit 099a86cff0ad16a4d1a798efaf1b2118cf8e0cfb Author: Rekha Seethamraju <rekha.k...@gmail.com> AuthorDate: Wed Apr 24 12:30:36 2024 -0700 Add schema as input to the decoder. (#12981) --- .../realtime/RealtimeSegmentDataManager.java | 24 +++++++++- .../pinot/spi/stream/StreamDecoderProvider.java | 52 ---------------------- .../pinot/spi/stream/StreamMessageDecoder.java | 21 +++++++-- 3 files changed, 40 insertions(+), 57 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 6771e038d1..3290c5e4f3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -75,6 +75,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.metrics.PinotMeter; +import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.stream.ConsumerPartitionState; import org.apache.pinot.spi.stream.LongMsgOffset; @@ -91,7 +92,6 @@ import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; import org.apache.pinot.spi.stream.StreamDataDecoder; import org.apache.pinot.spi.stream.StreamDataDecoderImpl; import org.apache.pinot.spi.stream.StreamDataDecoderResult; -import org.apache.pinot.spi.stream.StreamDecoderProvider; import org.apache.pinot.spi.stream.StreamMessage; import org.apache.pinot.spi.stream.StreamMessageDecoder; import org.apache.pinot.spi.stream.StreamMessageMetadata; @@ -1505,7 +1505,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // Create message decoder Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema); try { - StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(_streamConfig, fieldsToRead); + StreamMessageDecoder streamMessageDecoder = createMessageDecoder(fieldsToRead); _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder); } catch (Exception e) { _realtimeTableDataManager.addSegmentError(_segmentNameStr, @@ -1780,6 +1780,26 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { } } + /** + * Creates a {@link StreamMessageDecoder} using properties in {@link StreamConfig}. + * + * @param streamConfig The stream config from the table config + * @param fieldsToRead The fields to read from the source stream + * @return The initialized StreamMessageDecoder + */ + private StreamMessageDecoder createMessageDecoder(Set<String> fieldsToRead) { + String decoderClass = _streamConfig.getDecoderClass(); + try { + Map<String, String> decoderProperties = _streamConfig.getDecoderProperties(); + StreamMessageDecoder decoder = PluginManager.get().createInstance(decoderClass); + decoder.init(fieldsToRead, _streamConfig, _tableConfig, _schema); + return decoder; + } catch (Exception e) { + throw new RuntimeException( + "Caught exception while creating StreamMessageDecoder from stream config: " + _streamConfig, e); + } + } + @Override public MutableSegment getSegment() { return _realtimeSegment; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java deleted file mode 100644 index fdb97093de..0000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream; - -import java.util.Map; -import java.util.Set; -import org.apache.pinot.spi.plugin.PluginManager; - - -/** - * Provider for {@link StreamMessageDecoder} - */ -public class StreamDecoderProvider { - private StreamDecoderProvider() { - } - - /** - * Creates a {@link StreamMessageDecoder} using properties in {@link StreamConfig}. - * - * @param streamConfig The stream config from the table config - * @param fieldsToRead The fields to read from the source stream - * @return The initialized StreamMessageDecoder - */ - public static StreamMessageDecoder create(StreamConfig streamConfig, Set<String> fieldsToRead) { - String decoderClass = streamConfig.getDecoderClass(); - Map<String, String> decoderProperties = streamConfig.getDecoderProperties(); - try { - StreamMessageDecoder decoder = PluginManager.get().createInstance(decoderClass); - decoder.init(decoderProperties, fieldsToRead, streamConfig.getTopicName()); - return decoder; - } catch (Exception e) { - throw new RuntimeException( - "Caught exception while creating StreamMessageDecoder from stream config: " + streamConfig, e); - } - } -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java index 89312f06b6..b736e975d1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java @@ -23,9 +23,10 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; - /** * Interface for a decoder of messages fetched from the stream * @param <T> @@ -46,8 +47,22 @@ public interface StreamMessageDecoder<T> { * @param topicName Topic name of the stream * @throws Exception If an error occurs */ - void init(Map<String, String> props, Set<String> fieldsToRead, String topicName) - throws Exception; + default void init(Map<String, String> props, Set<String> fieldsToRead, String topicName) + throws Exception { + throw new UnsupportedOperationException("init method not implemented"); + } + + /** + * Initializes the decoder. + * @param streamConfig Can be derived from tableConfig but is passed explicitly to avoid redundant computation + * @param tableConfig Table Config of the table + * @param schema Schema of the table + * @throws Exception + */ + default void init(Set<String> fieldsToRead, StreamConfig streamConfig, TableConfig tableConfig, Schema schema) + throws Exception { + init(streamConfig.getDecoderProperties(), fieldsToRead, streamConfig.getTopicName()); + } /** * Decodes a row. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org