This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 594f1ba946 [bugfix] Fix stream config validator for protobuf decoders
(#11551)
594f1ba946 is described below
commit 594f1ba9463b8ead3eb81a8b3118ee4ca8bbc226
Author: Jeffrey Bolle <[email protected]>
AuthorDate: Sun Sep 10 07:18:30 2023 -0400
[bugfix] Fix stream config validator for protobuf decoders (#11551)
* fix stream config protobuf param check.
* decoder properties is a map of only the identified stream.decoder.prop
values.
* update protobuf stream validator check tests to test valid and invalid
paths.
* remove import.
---
.../segment/local/utils/TableConfigUtils.java | 6 ++-
.../segment/local/utils/TableConfigUtilsTest.java | 55 +++++++++++++++++++---
2 files changed, 53 insertions(+), 8 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index e8dfef485a..75a2f23054 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -517,11 +517,13 @@ public final class TableConfigUtils {
@VisibleForTesting
static void validateDecoder(StreamConfig streamConfig) {
if
(streamConfig.getDecoderClass().equals("org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder"))
{
+ String descriptorFilePath = "descriptorFile";
+ String protoClassName = "protoClassName";
// check the existence of the needed decoder props
- if
(!streamConfig.getDecoderProperties().containsKey("stream.kafka.decoder.prop.descriptorFile"))
{
+ if
(!streamConfig.getDecoderProperties().containsKey(descriptorFilePath)) {
throw new IllegalStateException("Missing property of descriptorFile
for ProtoBufMessageDecoder");
}
- if
(!streamConfig.getDecoderProperties().containsKey("stream.kafka.decoder.prop.protoClassName"))
{
+ if (!streamConfig.getDecoderProperties().containsKey(protoClassName)) {
throw new IllegalStateException("Missing property of protoClassName
for ProtoBufMessageDecoder");
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index c68b77c553..985c0be825 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -617,17 +617,36 @@ public class TableConfigUtilsTest {
TableConfigUtils.validateIngestionConfig(tableConfig, null);
// validate the proto decoder
- streamConfigs = getStreamConfigs();
- streamConfigs.put("stream.kafka.decoder.class.name",
- "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder");
- streamConfigs.put("stream.kafka.decoder.prop.descriptorFile",
"file://test");
+ streamConfigs = getKafkaStreamConfigs();
+ //test config should be valid
+ TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs));
+ streamConfigs.remove("stream.kafka.decoder.prop.descriptorFile");
try {
TableConfigUtils.validateDecoder(new StreamConfig("test",
streamConfigs));
} catch (IllegalStateException e) {
// expected
}
- streamConfigs.remove("stream.kafka.decoder.prop.descriptorFile");
- streamConfigs.put("stream.kafka.decoder.prop.protoClassName", "test");
+ streamConfigs = getKafkaStreamConfigs();
+ streamConfigs.remove("stream.kafka.decoder.prop.protoClassName");
+ try {
+ TableConfigUtils.validateDecoder(new StreamConfig("test",
streamConfigs));
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ //validate the protobuf pulsar config
+ streamConfigs = getPulsarStreamConfigs();
+ //test config should be valid
+ TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs));
+ //remove the descriptor file, should fail
+ streamConfigs.remove("stream.pulsar.decoder.prop.descriptorFile");
+ try {
+ TableConfigUtils.validateDecoder(new StreamConfig("test",
streamConfigs));
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ streamConfigs = getPulsarStreamConfigs();
+ //remove the proto class name, should fail
+ streamConfigs.remove("stream.pulsar.decoder.prop.protoClassName");
try {
TableConfigUtils.validateDecoder(new StreamConfig("test",
streamConfigs));
} catch (IllegalStateException e) {
@@ -2092,4 +2111,28 @@ public class TableConfigUtilsTest {
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
return streamConfigs;
}
+
+ private Map<String, String> getKafkaStreamConfigs() {
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("streamType", "kafka");
+ streamConfigs.put("stream.kafka.consumer.type", "lowlevel");
+ streamConfigs.put("stream.kafka.topic.name", "test");
+ streamConfigs.put("stream.kafka.decoder.class.name",
+ "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder");
+ streamConfigs.put("stream.kafka.decoder.prop.descriptorFile",
"file://test");
+ streamConfigs.put("stream.kafka.decoder.prop.protoClassName", "test");
+ return streamConfigs;
+ }
+
+ private Map<String, String> getPulsarStreamConfigs() {
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("streamType", "pulsar");
+ streamConfigs.put("stream.pulsar.consumer.type", "lowlevel");
+ streamConfigs.put("stream.pulsar.topic.name", "test");
+ streamConfigs.put("stream.pulsar.decoder.prop.descriptorFile",
"file://test");
+ streamConfigs.put("stream.pulsar.decoder.prop.protoClassName", "test");
+ streamConfigs.put("stream.pulsar.decoder.class.name",
+ "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder");
+ return streamConfigs;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]