This is an automated email from the ASF dual-hosted git repository.

yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 53e1dbcf63 add validation for proto decoder (#9588)
53e1dbcf63 is described below

commit 53e1dbcf63a94c16964288b360c3d354d329ced5
Author: Yupeng Fu <yupe...@users.noreply.github.com>
AuthorDate: Sat Dec 31 11:07:39 2022 -0800

    add validation for proto decoder (#9588)
    
    * add validation for proto decoder
    
    * comments
---
 .../pinot/segment/local/utils/TableConfigUtils.java   | 18 +++++++++++++++++-
 .../segment/local/utils/TableConfigUtilsTest.java     | 19 +++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 7aaf01c787..6319431c75 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -303,12 +303,15 @@ public final class TableConfigUtils {
             "Should not use indexingConfig#getStreamConfigs if 
ingestionConfig#StreamIngestionConfig is provided");
         List<Map<String, String>> streamConfigMaps = 
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
         Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream 
is supported in REALTIME table");
+
+        StreamConfig streamConfig;
         try {
           // Validate that StreamConfig can be created
-          new StreamConfig(tableNameWithType, streamConfigMaps.get(0));
+          streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMaps.get(0));
         } catch (Exception e) {
           throw new IllegalStateException("Could not create StreamConfig using 
the streamConfig map", e);
         }
+        validateDecoder(streamConfig);
       }
 
       // Filter config
@@ -458,6 +461,19 @@ public final class TableConfigUtils {
         String.format("aggregation function %s must be one of %s", name, 
SUPPORTED_INGESTION_AGGREGATIONS));
   }
 
+  @VisibleForTesting
+  static void validateDecoder(StreamConfig streamConfig) {
+    if 
(streamConfig.getDecoderClass().equals("org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder"))
 {
+      // check the existence of the needed decoder props
+      if 
(!streamConfig.getDecoderProperties().containsKey("stream.kafka.decoder.prop.descriptorFile"))
 {
+        throw new IllegalStateException("Missing property of descriptorFile 
for ProtoBufMessageDecoder");
+      }
+      if 
(!streamConfig.getDecoderProperties().containsKey("stream.kafka.decoder.prop.protoClassName"))
 {
+        throw new IllegalStateException("Missing property of protoClassName 
for ProtoBufMessageDecoder");
+      }
+    }
+  }
+
   @VisibleForTesting
   static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
     TableTaskConfig taskConfig = tableConfig.getTaskConfig();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index a563e9be21..adf81f9b30 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -54,6 +54,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -537,6 +538,24 @@ public class TableConfigUtilsTest {
     } catch (IllegalStateException e) {
       // expected
     }
+
+    // validate the proto decoder
+    streamConfigs = getStreamConfigs();
+    streamConfigs.put("stream.kafka.decoder.class.name",
+        "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder");
+    streamConfigs.put("stream.kafka.decoder.prop.descriptorFile", 
"file://test");
+    try {
+      TableConfigUtils.validateDecoder(new StreamConfig("test", 
streamConfigs));
+    } catch (IllegalStateException e) {
+      // expected
+    }
+    streamConfigs.remove("stream.kafka.decoder.prop.descriptorFile");
+    streamConfigs.put("stream.kafka.decoder.prop.protoClassName", "test");
+    try {
+      TableConfigUtils.validateDecoder(new StreamConfig("test", 
streamConfigs));
+    } catch (IllegalStateException e) {
+      // expected
+    }
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to