This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 42a2fc70ff Add Support for Schema Registry in Protobuf Decoder (#9220)
42a2fc70ff is described below

commit 42a2fc70ff37fddaaba300becd36b41f170e77bd
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Thu Aug 18 12:17:59 2022 +0530

    Add Support for Schema Registry in Protobuf Decoder (#9220)
    
    * working on schema registry. refactor pending
    
    * Add tests for confluent protobuf and delete duplicate files
    
    * move dependency versions to parent pom
    
    * Fix protobuf decoder bug: Not honouring offset and length
    
    Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
---
 .../v0_deprecated/pinot-ingestion-common/pom.xml   |   6 +
 .../pinot-confluent-avro/pom.xml                   |   1 -
 .../pinot-input-format/pinot-protobuf/pom.xml      |  97 +++++++++++++-
 ...fluentSchemaRegistryProtoBufMessageDecoder.java | 143 +++++++++++++++++++++
 .../protobuf/ProtoBufMessageDecoder.java           |  11 +-
 .../protobuf/ProtoBufConfluentSchemaTest.java      | 130 +++++++++++++++++++
 .../schemaregistry/SchemaRegistryStarter.java      | 105 +++++++++++++++
 pom.xml                                            |   1 +
 8 files changed, 486 insertions(+), 8 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
index 8fb34e6fe3..4d0f323e2b 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
@@ -33,6 +33,12 @@
   <artifactId>pinot-ingestion-common</artifactId>
   <name>Pinot Ingestion Common</name>
   <url>https://pinot.apache.org/</url>
+  <repositories>
+    <repository>
+      <id>confluent</id>
+      <url>https://packages.confluent.io/maven/</url>
+    </repository>
+  </repositories>
   <properties>
     <pinot.root>${basedir}/../../../..</pinot.root>
     <phase.prop>package</phase.prop>
diff --git a/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml 
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml
index 57d46b0d5b..e1e4f87b8a 100644
--- a/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml
+++ b/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml
@@ -35,7 +35,6 @@
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
     <kafka.lib.version>2.8.1</kafka.lib.version>
-    <confluent.version>5.3.1</confluent.version>
     <phase.prop>package</phase.prop>
   </properties>
   <repositories>
diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml 
b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
index d3befbbc91..c1e25bb9b6 100644
--- a/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
+++ b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
@@ -35,9 +35,16 @@
   <url>https://pinot.apache.org/</url>
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
-    <proto.version>3.11.4</proto.version>
+    <kafka.lib.version>2.8.1</kafka.lib.version>
+    <testcontainers.version>1.17.3</testcontainers.version>
     <phase.prop>package</phase.prop>
   </properties>
+  <repositories>
+    <repository>
+      <id>confluent</id>
+      <url>https://packages.confluent.io/maven/</url>
+    </repository>
+  </repositories>
   <dependencies>
     <dependency>
       <groupId>commons-lang</groupId>
@@ -46,7 +53,6 @@
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
-      <version>${proto.version}</version>
     </dependency>
     <dependency>
       <groupId>com.github.os72</groupId>
@@ -59,6 +65,93 @@
         </exclusion>
       </exclusions>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.lib.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.sf.jopt-simple</groupId>
+          <artifactId>jopt-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-schema-registry-client</artifactId>
+      <version>${confluent.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.swagger</groupId>
+          <artifactId>swagger-annotations</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-protobuf-serializer</artifactId>
+      <version>${confluent.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.jetbrains.kotlin</groupId>
+          <artifactId>kotlin-stdlib-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.squareup.okio</groupId>
+          <artifactId>okio</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.errorprone</groupId>
+          <artifactId>error_prone_annotations</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>kafka</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/KafkaConfluentSchemaRegistryProtoBufMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/KafkaConfluentSchemaRegistryProtoBufMessageDecoder.java
new file mode 100644
index 0000000000..5556e6a97e
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/KafkaConfluentSchemaRegistryProtoBufMessageDecoder.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
+import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractor;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkState;
+
+
+public class KafkaConfluentSchemaRegistryProtoBufMessageDecoder implements 
StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(KafkaConfluentSchemaRegistryProtoBufMessageDecoder.class);
+  private static final String SCHEMA_REGISTRY_REST_URL = 
"schema.registry.rest.url";
+  private static final String SCHEMA_REGISTRY_OPTS_PREFIX = "schema.registry.";
+  public static final String CACHED_SCHEMA_MAP_CAPACITY = 
"cached.schema.map.capacity";
+  public static final String DEFAULT_CACHED_SCHEMA_MAP_CAPACITY = "1000";
+
+  private KafkaProtobufDeserializer<Message> _deserializer;
+  private RecordExtractor<Message> _protoBufRecordExtractor;
+  private String _topicName;
+
+  private RestService createRestService(String schemaRegistryUrl, Map<String, 
String> configs) {
+    RestService restService = new RestService(schemaRegistryUrl);
+
+    ConfigDef configDef = new ConfigDef();
+    SslConfigs.addClientSslSupport(configDef);
+    Map<String, ConfigDef.ConfigKey> configKeyMap = configDef.configKeys();
+    Map<String, Object> sslConfigs = new HashMap<>();
+    for (String key : configs.keySet()) {
+      if (!key.equals(SCHEMA_REGISTRY_REST_URL) && 
key.startsWith(SCHEMA_REGISTRY_OPTS_PREFIX)) {
+        String value = configs.get(key);
+        String schemaRegistryOptKey = 
key.substring(SCHEMA_REGISTRY_OPTS_PREFIX.length());
+
+        if (configKeyMap.containsKey(schemaRegistryOptKey)) {
+          if (configKeyMap.get(schemaRegistryOptKey).type == 
ConfigDef.Type.PASSWORD) {
+            sslConfigs.put(schemaRegistryOptKey, new Password(value));
+          } else {
+            sslConfigs.put(schemaRegistryOptKey, value);
+          }
+        }
+      }
+    }
+
+    if (!sslConfigs.isEmpty()) {
+      DefaultSslEngineFactory sslFactory = new DefaultSslEngineFactory();
+      sslFactory.configure(sslConfigs);
+      
restService.setSslSocketFactory(sslFactory.sslContext().getSocketFactory());
+    }
+    return restService;
+  }
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String 
topicName)
+      throws Exception {
+    checkState(props.containsKey(SCHEMA_REGISTRY_REST_URL), "Missing required 
property '%s'", SCHEMA_REGISTRY_REST_URL);
+    String schemaRegistryUrl = props.get(SCHEMA_REGISTRY_REST_URL);
+    ProtobufSchemaProvider protobufSchemaProvider = new 
ProtobufSchemaProvider();
+    int identityMapCapacity = Integer.parseInt(
+        props.getOrDefault(CACHED_SCHEMA_MAP_CAPACITY, 
DEFAULT_CACHED_SCHEMA_MAP_CAPACITY));
+    SchemaRegistryClient schemaRegistryClient =
+        new CachedSchemaRegistryClient(createRestService(schemaRegistryUrl, 
props),
+            identityMapCapacity, 
Collections.singletonList(protobufSchemaProvider), props, null);
+    _deserializer = new KafkaProtobufDeserializer<>(schemaRegistryClient);
+    Preconditions.checkNotNull(topicName, "Topic must be provided");
+    _topicName = topicName;
+    _protoBufRecordExtractor = 
PluginManager.get().createInstance(ProtoBufRecordExtractor.class.getName());
+    _protoBufRecordExtractor.init(fieldsToRead, null);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    try {
+      Message protoMessage = _deserializer.deserialize(_topicName, payload);
+      return _protoBufRecordExtractor.extract(protoMessage, destination);
+    } catch (RuntimeException e) {
+      ignoreOrRethrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow 
destination) {
+    return decode(Arrays.copyOfRange(payload, offset, offset + length), 
destination);
+  }
+
+  /**
+   * This method handles specific serialisation exceptions. If the exception 
cannot be ignored the method
+   * re-throws the exception.
+   *
+   * @param e exception to handle
+   */
+  private void ignoreOrRethrowException(RuntimeException e) {
+    if (isUnknownMagicByte(e) || isUnknownMagicByte(e.getCause())) {
+      // Do nothing, the message is not an ProtoBuf message and can't be 
decoded
+      LOGGER.error("Caught exception while decoding row in topic {}, 
discarding row", _topicName, e);
+      return;
+    }
+    throw e;
+  }
+
+  private boolean isUnknownMagicByte(Throwable e) {
+    return e != null && e instanceof SerializationException && e.getMessage() 
!= null && e.getMessage().toLowerCase()
+        .contains("unknown magic byte");
+  }
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java
index db1995a6b2..fb0c5b3e8d 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java
@@ -25,6 +25,7 @@ import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.Message;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
@@ -78,11 +79,6 @@ public class ProtoBufMessageDecoder implements 
StreamMessageDecoder<byte[]> {
 
   @Override
   public GenericRow decode(byte[] payload, GenericRow destination) {
-    return decode(payload, 0, payload.length, destination);
-  }
-
-  @Override
-  public GenericRow decode(byte[] payload, int offset, int length, GenericRow 
destination) {
     Message message;
     try {
       _builder.mergeFrom(payload);
@@ -96,4 +92,9 @@ public class ProtoBufMessageDecoder implements 
StreamMessageDecoder<byte[]> {
     _recordExtractor.extract(message, destination);
     return destination;
   }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow 
destination) {
+    return decode(Arrays.copyOfRange(payload, offset, offset + length), 
destination);
+  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.java
new file mode 100644
index 0000000000..ca0270ada4
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import 
org.apache.pinot.plugin.inputformat.protobuf.kafka.schemaregistry.SchemaRegistryStarter;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class ProtoBufConfluentSchemaTest {
+  public static final String TOPIC_PROTO = "test_topic_proto";
+  SchemaRegistryStarter.KafkaSchemaRegistryInstance _schemaRegistry;
+  private Producer<byte[], Message> _protoProducer;
+
+  @BeforeClass
+  public void setup() {
+    _schemaRegistry = SchemaRegistryStarter.startLocalInstance(9093);
+
+    Properties protoBufProducerProps = new Properties();
+    protoBufProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        _schemaRegistry._kafkaContainer.getBootstrapServers());
+    
protoBufProducerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
 _schemaRegistry.getUrl());
+    protoBufProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+    protoBufProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
+    _protoProducer = new KafkaProducer<>(protoBufProducerProps);
+  }
+
+  @Test
+  public void testSamplePinotConsumer()
+      throws Exception {
+    int numRecords = 10;
+    List<Sample.SampleRecord> recordList = new ArrayList<>();
+    for (int i = 0; i < numRecords; i++) {
+      Sample.SampleRecord sampleRecord = 
Sample.SampleRecord.newBuilder().addFriends(UUID.randomUUID().toString())
+          
.addFriends(UUID.randomUUID().toString()).setEmail(UUID.randomUUID().toString())
+          .setName(UUID.randomUUID().toString()).setId(i).build();
+
+      _protoProducer.send(new ProducerRecord<>(TOPIC_PROTO, sampleRecord));
+      recordList.add(sampleRecord);
+    }
+
+    Properties consumerProps = new Properties();
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
_schemaRegistry._kafkaContainer.getBootstrapServers());
+    
consumerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, 
_schemaRegistry.getUrl());
+    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "foo_bar");
+    KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
KafkaConsumer<>(consumerProps);
+    kafkaConsumer.subscribe(Collections.singletonList(TOPIC_PROTO));
+    ConsumerRecords<byte[], byte[]> consumerRecords = 
kafkaConsumer.poll(Duration.ofMillis(1000));
+    Iterator<ConsumerRecord<byte[], byte[]>> iter = consumerRecords.iterator();
+
+    KafkaConfluentSchemaRegistryProtoBufMessageDecoder decoder =
+        new KafkaConfluentSchemaRegistryProtoBufMessageDecoder();
+    Map<String, String> decoderProps = new HashMap<>();
+    decoderProps.put("schema.registry.rest.url", _schemaRegistry.getUrl());
+    decoder.init(decoderProps, null, TOPIC_PROTO);
+    GenericRow reuse = new GenericRow();
+    List<GenericRow> result = new ArrayList<>();
+    while (iter.hasNext()) {
+      byte[] arr = iter.next().value();
+      decoder.decode(arr, reuse);
+      result.add(reuse.copy());
+      reuse.clear();
+    }
+
+    Assert.assertEquals(result.size(), numRecords);
+
+    for (int i = 0; i < numRecords; i++) {
+      Sample.SampleRecord originalValue = recordList.get(i);
+      GenericRow decodedValue = result.get(i);
+
+      for (Map.Entry<Descriptors.FieldDescriptor, Object> fieldWithValue : 
originalValue.getAllFields().entrySet()) {
+        
Assert.assertNotNull(decodedValue.getValue(fieldWithValue.getKey().getName()));
+        if (!fieldWithValue.getKey().isRepeated()) {
+          Assert.assertEquals(fieldWithValue.getValue(), 
decodedValue.getValue(fieldWithValue.getKey().getName()));
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _schemaRegistry.stop();
+  }
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/kafka/schemaregistry/SchemaRegistryStarter.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/kafka/schemaregistry/SchemaRegistryStarter.java
new file mode 100644
index 0000000000..c6d7a59770
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/kafka/schemaregistry/SchemaRegistryStarter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf.kafka.schemaregistry;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+
+public class SchemaRegistryStarter {
+  public static final int DEFAULT_PORT = 8081;
+  private static final String CONFLUENT_PLATFORM_VERSION = "7.2.0";
+  private static final DockerImageName KAFKA_DOCKER_IMAGE_NAME =
+      DockerImageName.parse("confluentinc/cp-kafka:" + 
CONFLUENT_PLATFORM_VERSION);
+  private static final DockerImageName SCHEMA_REGISTRY_DOCKER_IMAGE_NAME =
+      DockerImageName.parse("confluentinc/cp-schema-registry:" + 
CONFLUENT_PLATFORM_VERSION);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaRegistryStarter.class);
+
+  private SchemaRegistryStarter() {
+  }
+
+  public static KafkaSchemaRegistryInstance startLocalInstance(int port) {
+    KafkaSchemaRegistryInstance kafkaSchemaRegistry = new 
KafkaSchemaRegistryInstance(port);
+    kafkaSchemaRegistry.start();
+    return kafkaSchemaRegistry;
+  }
+
+  public static class KafkaSchemaRegistryInstance {
+    private final int _port;
+    public KafkaContainer _kafkaContainer;
+    private Network _network;
+    private GenericContainer _schemaRegistryContainer;
+
+    private KafkaSchemaRegistryInstance(int port) {
+      _port = port;
+    }
+
+    public String getUrl() {
+      return "http://"; + _schemaRegistryContainer.getHost() + ":" + 
_schemaRegistryContainer.getMappedPort(_port);
+    }
+
+    public void start() {
+      LOGGER.info("Starting schema registry");
+      if (_kafkaContainer != null || _schemaRegistryContainer != null) {
+        throw new IllegalStateException("Schema registry is already running");
+      }
+
+      _network = Network.newNetwork();
+
+      _kafkaContainer = new 
KafkaContainer(KAFKA_DOCKER_IMAGE_NAME).withNetwork(_network).withNetworkAliases("kafka")
+          .withCreateContainerCmdModifier(it -> 
it.withHostName("kafka")).waitingFor(Wait.forListeningPort());
+      _kafkaContainer.start();
+
+      Map<String, String> schemaRegistryProps = new HashMap<>();
+      schemaRegistryProps.put("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", 
"kafka:9092");
+      schemaRegistryProps.put("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry");
+      schemaRegistryProps.put("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:"; + 
_port);
+      schemaRegistryProps.put("SCHEMA_REGISTRY_DEBUG", "true");
+      _schemaRegistryContainer =
+          new 
GenericContainer(SCHEMA_REGISTRY_DOCKER_IMAGE_NAME).dependsOn(_kafkaContainer).withNetwork(_network)
+              
.withNetworkAliases("schemaregistry").withEnv(schemaRegistryProps).withExposedPorts(_port)
+              .waitingFor(Wait.forListeningPort());
+      _schemaRegistryContainer.start();
+    }
+
+    public void stop() {
+      LOGGER.info("Stopping schema registry");
+      if (_schemaRegistryContainer != null) {
+        _schemaRegistryContainer.stop();
+        _schemaRegistryContainer = null;
+      }
+
+      if (_kafkaContainer != null) {
+        _kafkaContainer.stop();
+        _kafkaContainer = null;
+      }
+
+      if (_network != null) {
+        _network.close();
+      }
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index b61f381f90..5876c72893 100644
--- a/pom.xml
+++ b/pom.xml
@@ -168,6 +168,7 @@
     <kafka.version>2.0</kafka.version>
     <protobuf.version>3.19.2</protobuf.version>
     <grpc.version>1.41.0</grpc.version>
+    <confluent.version>5.5.3</confluent.version>
 
     <!-- Checkstyle violation prop.-->
     <checkstyle.violation.severity>warning</checkstyle.violation.severity>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to