pradeepgv42 commented on a change in pull request #5758:
URL: https://github.com/apache/incubator-pinot/pull/5758#discussion_r461904284



##########
File path: 
pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
##########
@@ -46,12 +54,50 @@
   private RecordExtractor<Record> _avroRecordExtractor;
   private String _topicName;
 
+  public RestService createRestService(String schemaRegistryUrl, Map<String, 
String> configs) {
+    RestService restService = new RestService(schemaRegistryUrl);
+    Map<String, Object> asslConfigs = configs.entrySet().stream()

Review comment:
       @elonazoulay This is an outdated copy of the code. Can you look at this
   https://github.com/apache/incubator-pinot/pull/5758/files

##########
File path: 
pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
##########
@@ -46,12 +54,50 @@
   private RecordExtractor<Record> _avroRecordExtractor;
   private String _topicName;
 
+  public RestService createRestService(String schemaRegistryUrl, Map<String, 
String> configs) {
+    RestService restService = new RestService(schemaRegistryUrl);
+    Map<String, Object> asslConfigs = configs.entrySet().stream()
+            .filter(e -> !e.equals(SCHEMA_REGISTRY_REST_URL))
+            .filter(e -> e.getKey().startsWith("schema.registry."))
+            .collect(Collectors.toMap(
+                    e -> e.getKey().substring("schema.registry.".length()),
+                    Map.Entry::getValue));
+
+    Map<String, Object> sslConfigs = new HashMap<>();
+    for (String key : configs.keySet()) {
+      if (!key.equals(SCHEMA_REGISTRY_REST_URL) && 
key.startsWith("schema.registry.")) {
+        String value = configs.get(key);
+        key = key.substring("schema.registry.".length());
+        if (key.contains("password")) {
+          sslConfigs.put(key, new Password(value));
+        } else {
+          sslConfigs.put(key, value);
+        }
+      }
+    }
+
+    System.out.println(configs.toString());

Review comment:
       @elonazoulay This is an outdated copy of the code. Can you look at this
   https://github.com/apache/incubator-pinot/pull/5758/files
   
   

##########
File path: 
pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
##########
@@ -46,12 +54,50 @@
   private RecordExtractor<Record> _avroRecordExtractor;
   private String _topicName;
 
+  public RestService createRestService(String schemaRegistryUrl, Map<String, 
String> configs) {
+    RestService restService = new RestService(schemaRegistryUrl);
+    Map<String, Object> asslConfigs = configs.entrySet().stream()
+            .filter(e -> !e.equals(SCHEMA_REGISTRY_REST_URL))
+            .filter(e -> e.getKey().startsWith("schema.registry."))
+            .collect(Collectors.toMap(
+                    e -> e.getKey().substring("schema.registry.".length()),
+                    Map.Entry::getValue));
+
+    Map<String, Object> sslConfigs = new HashMap<>();
+    for (String key : configs.keySet()) {
+      if (!key.equals(SCHEMA_REGISTRY_REST_URL) && 
key.startsWith("schema.registry.")) {

Review comment:
       Sorry not very familiar with this, do you have an example or give a bit 
more info? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to