This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e08602  Pradeep/sr ssl fix (#5758)
2e08602 is described below

commit 2e08602c086f02294714beaec761c2c6a9de7c30
Author: pradeepgv42 <66842697+pradeepg...@users.noreply.github.com>
AuthorDate: Thu Aug 6 10:23:27 2020 -0700

    Pradeep/sr ssl fix (#5758)
    
    * Stashing avro bug fixe
    
    * Only add known ssl configs
    
    * Add documentation for the new configs
    
    Co-authored-by: Pradeep Gopanapalli Venkata 
<pradeepvenk...@pradeeps-mbp.lan>
---
 docs/pluggable_streams.rst                         | 56 ++++++++++++++++++++++
 ...aConfluentSchemaRegistryAvroMessageDecoder.java | 49 ++++++++++++++++++-
 2 files changed, 104 insertions(+), 1 deletion(-)

diff --git a/docs/pluggable_streams.rst b/docs/pluggable_streams.rst
index 8cc231f..309c840 100644
--- a/docs/pluggable_streams.rst
+++ b/docs/pluggable_streams.rst
@@ -268,6 +268,62 @@ confluent schema registry:
     }
   }
 
+Here is another example which uses SSL based authentication to talk with kafka
+and schema-registry. Notice there are two sets of SSL options, ones starting 
with
+`ssl.` are for kafka consumer and ones with 
`stream.kafka.decoder.prop.schema.registry.`
+are for `SchemaRegistryClient` used by 
`KafkaConfluentSchemaRegistryAvroMessageDecoder`.
+
+
+.. code-block:: none
+
+  {
+    "tableName": "meetupRsvp",
+    "tableType": "REALTIME",
+    "segmentsConfig": {
+      "timeColumnName": "mtime",
+      "timeType": "MILLISECONDS",
+      "segmentPushType": "APPEND",
+      "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+      "schemaName": "meetupRsvp",
+      "replication": "1",
+      "replicasPerPartition": "1"
+    },
+    "tenants": {},
+    "tableIndexConfig": {
+      "loadMode": "MMAP",
+      "streamConfigs": {
+        "streamType": "kafka",
+        "stream.kafka.consumer.type": "LowLevel",
+        "stream.kafka.topic.name": "meetupRSVPEvents",
+        "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
+        "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
+        "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+        "stream.kafka.broker.list": "localhost:19092",
+
+        "schema.registry.url": "",
+        "security.protocol": "",
+        "ssl.truststore.location": "",
+        "ssl.keystore.location": "",
+        "ssl.truststore.password": "",
+        "ssl.keystore.password": "",
+        "ssl.key.password": "",
+
+        "stream.kafka.decoder.prop.schema.registry.rest.url": "",
+        "stream.kafka.decoder.prop.schema.registry.ssl.truststore.location": 
"",
+        "stream.kafka.decoder.prop.schema.registry.ssl.keystore.location": "",
+        "stream.kafka.decoder.prop.schema.registry.ssl.truststore.password": 
"",
+        "stream.kafka.decoder.prop.schema.registry.ssl.keystore.password": "",
+        "stream.kafka.decoder.prop.schema.registry.ssl.keystore.type": "",
+        "stream.kafka.decoder.prop.schema.registry.ssl.truststore.type": "",
+        "stream.kafka.decoder.prop.schema.registry.ssl.key.password": "",
+        "stream.kafka.decoder.prop.schema.registry.ssl.protocol": "",
+      }
+    },
+    "metadata": {
+      "customConfigs": {}
+    }
+  }
+
 Upgrade from Kafka 0.9 connector to Kafka 2.x connector
 -------------------------------------------------------
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
index 4e61d9a..1f0f442 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
@@ -21,11 +21,21 @@ package org.apache.pinot.plugin.inputformat.avro.confluent;
 import com.google.common.base.Preconditions;
 import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
@@ -42,16 +52,53 @@ import static 
com.google.common.base.Preconditions.checkState;
  */
 public class KafkaConfluentSchemaRegistryAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
   private static final String SCHEMA_REGISTRY_REST_URL = 
"schema.registry.rest.url";
+  private static final String SCHEMA_REGISTRY_OPTS_PREFIX = "schema.registry.";
   private KafkaAvroDeserializer _deserializer;
   private RecordExtractor<Record> _avroRecordExtractor;
   private String _topicName;
 
+  public RestService createRestService(String schemaRegistryUrl, Map<String, 
String> configs) {
+    RestService restService = new RestService(schemaRegistryUrl);
+
+
+    ConfigDef configDef = new ConfigDef();
+    SslConfigs.addClientSslSupport(configDef);
+    Map<String, ConfigDef.ConfigKey> configKeyMap = configDef.configKeys();
+    Map<String, Object> sslConfigs = new HashMap<>();
+    for (String key : configs.keySet()) {
+      if (!key.equals(SCHEMA_REGISTRY_REST_URL) && 
key.startsWith(SCHEMA_REGISTRY_OPTS_PREFIX)) {
+        String value = configs.get(key);
+        key = key.substring(SCHEMA_REGISTRY_OPTS_PREFIX.length());
+
+        if (configKeyMap.containsKey(key)) {
+          if (configKeyMap.get(key).type == ConfigDef.Type.PASSWORD) {
+            sslConfigs.put(key, new Password(value));
+          } else {
+            sslConfigs.put(key, value);
+          }
+        }
+      }
+    }
+
+
+    if (!sslConfigs.isEmpty()) {
+      SslFactory sslFactory = new SslFactory(Mode.CLIENT);
+      sslFactory.configure(sslConfigs);
+      
restService.setSslSocketFactory(sslFactory.sslContext().getSocketFactory());
+    }
+    return restService;
+  }
+
   @Override
   public void init(Map<String, String> props, Set<String> fieldsToRead, String 
topicName)
       throws Exception {
     checkState(props.containsKey(SCHEMA_REGISTRY_REST_URL), "Missing required 
property '%s'", SCHEMA_REGISTRY_REST_URL);
     String schemaRegistryUrl = props.get(SCHEMA_REGISTRY_REST_URL);
-    SchemaRegistryClient schemaRegistryClient = new 
CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
+    SchemaRegistryClient schemaRegistryClient =
+            new CachedSchemaRegistryClient(
+                    createRestService(schemaRegistryUrl, props),
+                    1000);
+
     _deserializer = new KafkaAvroDeserializer(schemaRegistryClient);
     Preconditions.checkNotNull(topicName, "Topic must be provided");
     _topicName = topicName;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to