This is an automated email from the ASF dual-hosted git repository. yupeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 53e1dbcf63 add validation for proto decoder (#9588) 53e1dbcf63 is described below commit 53e1dbcf63a94c16964288b360c3d354d329ced5 Author: Yupeng Fu <yupe...@users.noreply.github.com> AuthorDate: Sat Dec 31 11:07:39 2022 -0800 add validation for proto decoder (#9588) * add validation for proto decoder * comments --- .../pinot/segment/local/utils/TableConfigUtils.java | 18 +++++++++++++++++- .../segment/local/utils/TableConfigUtilsTest.java | 19 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 7aaf01c787..6319431c75 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -303,12 +303,15 @@ public final class TableConfigUtils { "Should not use indexingConfig#getStreamConfigs if ingestionConfig#StreamIngestionConfig is provided"); List<Map<String, String>> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps(); Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream is supported in REALTIME table"); + + StreamConfig streamConfig; try { // Validate that StreamConfig can be created - new StreamConfig(tableNameWithType, streamConfigMaps.get(0)); + streamConfig = new StreamConfig(tableNameWithType, streamConfigMaps.get(0)); } catch (Exception e) { throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e); } + validateDecoder(streamConfig); } // Filter config @@ -458,6 +461,19 @@ public final class TableConfigUtils { String.format("aggregation function %s must be one of %s", name, SUPPORTED_INGESTION_AGGREGATIONS)); } + @VisibleForTesting + static void validateDecoder(StreamConfig streamConfig) { + if (streamConfig.getDecoderClass().equals("org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder")) { + // check the existence of the needed decoder props + if (!streamConfig.getDecoderProperties().containsKey("stream.kafka.decoder.prop.descriptorFile")) { + throw new IllegalStateException("Missing property of descriptorFile for ProtoBufMessageDecoder"); + } + if (!streamConfig.getDecoderProperties().containsKey("stream.kafka.decoder.prop.protoClassName")) { + throw new IllegalStateException("Missing property of protoClassName for ProtoBufMessageDecoder"); + } + } + } + @VisibleForTesting static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index a563e9be21..adf81f9b30 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -54,6 +54,7 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; @@ -537,6 +538,24 @@ public class TableConfigUtilsTest { } catch (IllegalStateException e) { // expected } + + // validate the proto decoder + streamConfigs = getStreamConfigs(); + streamConfigs.put("stream.kafka.decoder.class.name", + "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder"); + streamConfigs.put("stream.kafka.decoder.prop.descriptorFile", "file://test"); + try { + TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs)); + } catch (IllegalStateException e) { + // expected + } + streamConfigs.remove("stream.kafka.decoder.prop.descriptorFile"); + streamConfigs.put("stream.kafka.decoder.prop.protoClassName", "test"); + try { + TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs)); + } catch (IllegalStateException e) { + // expected + } } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org