This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 099a86cff0 Add schema as input to the decoder. (#12981)
099a86cff0 is described below

commit 099a86cff0ad16a4d1a798efaf1b2118cf8e0cfb
Author: Rekha Seethamraju <rekha.k...@gmail.com>
AuthorDate: Wed Apr 24 12:30:36 2024 -0700

    Add schema as input to the decoder. (#12981)
---
 .../realtime/RealtimeSegmentDataManager.java       | 24 +++++++++-
 .../pinot/spi/stream/StreamDecoderProvider.java    | 52 ----------------------
 .../pinot/spi/stream/StreamMessageDecoder.java     | 21 +++++++--
 3 files changed, 40 insertions(+), 57 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 6771e038d1..3290c5e4f3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -75,6 +75,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import org.apache.pinot.spi.stream.LongMsgOffset;
@@ -91,7 +92,6 @@ import 
org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamDataDecoder;
 import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
 import org.apache.pinot.spi.stream.StreamDataDecoderResult;
-import org.apache.pinot.spi.stream.StreamDecoderProvider;
 import org.apache.pinot.spi.stream.StreamMessage;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
@@ -1505,7 +1505,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     // Create message decoder
     Set<String> fieldsToRead = 
IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), 
_schema);
     try {
-      StreamMessageDecoder streamMessageDecoder = 
StreamDecoderProvider.create(_streamConfig, fieldsToRead);
+      StreamMessageDecoder streamMessageDecoder = 
createMessageDecoder(fieldsToRead);
       _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
     } catch (Exception e) {
       _realtimeTableDataManager.addSegmentError(_segmentNameStr,
@@ -1780,6 +1780,26 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     }
   }
 
+  /**
+   * Creates a {@link StreamMessageDecoder} using properties in {@link 
StreamConfig}.
+   *
+   * @param streamConfig The stream config from the table config
+   * @param fieldsToRead The fields to read from the source stream
+   * @return The initialized StreamMessageDecoder
+   */
+  private StreamMessageDecoder createMessageDecoder(Set<String> fieldsToRead) {
+    String decoderClass = _streamConfig.getDecoderClass();
+    try {
+      Map<String, String> decoderProperties = 
_streamConfig.getDecoderProperties();
+      StreamMessageDecoder decoder = 
PluginManager.get().createInstance(decoderClass);
+      decoder.init(fieldsToRead, _streamConfig, _tableConfig, _schema);
+      return decoder;
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Caught exception while creating StreamMessageDecoder from stream 
config: " + _streamConfig, e);
+    }
+  }
+
   @Override
   public MutableSegment getSegment() {
     return _realtimeSegment;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
deleted file mode 100644
index fdb97093de..0000000000
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream;
-
-import java.util.Map;
-import java.util.Set;
-import org.apache.pinot.spi.plugin.PluginManager;
-
-
-/**
- * Provider for {@link StreamMessageDecoder}
- */
-public class StreamDecoderProvider {
-  private StreamDecoderProvider() {
-  }
-
-  /**
-   * Creates a {@link StreamMessageDecoder} using properties in {@link 
StreamConfig}.
-   *
-   * @param streamConfig The stream config from the table config
-   * @param fieldsToRead The fields to read from the source stream
-   * @return The initialized StreamMessageDecoder
-   */
-  public static StreamMessageDecoder create(StreamConfig streamConfig, 
Set<String> fieldsToRead) {
-    String decoderClass = streamConfig.getDecoderClass();
-    Map<String, String> decoderProperties = 
streamConfig.getDecoderProperties();
-    try {
-      StreamMessageDecoder decoder = 
PluginManager.get().createInstance(decoderClass);
-      decoder.init(decoderProperties, fieldsToRead, 
streamConfig.getTopicName());
-      return decoder;
-    } catch (Exception e) {
-      throw new RuntimeException(
-          "Caught exception while creating StreamMessageDecoder from stream 
config: " + streamConfig, e);
-    }
-  }
-}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
index 89312f06b6..b736e975d1 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
@@ -23,9 +23,10 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
-
 /**
  * Interface for a decoder of messages fetched from the stream
  * @param <T>
@@ -46,8 +47,22 @@ public interface StreamMessageDecoder<T> {
    * @param topicName Topic name of the stream
    * @throws Exception If an error occurs
    */
-  void init(Map<String, String> props, Set<String> fieldsToRead, String 
topicName)
-      throws Exception;
+  default void init(Map<String, String> props, Set<String> fieldsToRead, 
String topicName)
+      throws Exception {
+    throw new UnsupportedOperationException("init method not implemented");
+  }
+
+  /**
+   * Initializes the decoder.
+   * @param streamConfig Can be derived from tableConfig but is passed 
explicitly to avoid redundant computation
+   * @param tableConfig Table Config of the table
+   * @param schema Schema of the table
+   * @throws Exception
+   */
+  default void init(Set<String> fieldsToRead, StreamConfig streamConfig, 
TableConfig tableConfig, Schema schema)
+      throws Exception {
+    init(streamConfig.getDecoderProperties(), fieldsToRead, 
streamConfig.getTopicName());
+  }
 
   /**
    * Decodes a row.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to