This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2e08602 Pradeep/sr ssl fix (#5758) 2e08602 is described below commit 2e08602c086f02294714beaec761c2c6a9de7c30 Author: pradeepgv42 <66842697+pradeepg...@users.noreply.github.com> AuthorDate: Thu Aug 6 10:23:27 2020 -0700 Pradeep/sr ssl fix (#5758) * Stashing avro bug fixe * Only add known ssl configs * Add documentation for the new configs Co-authored-by: Pradeep Gopanapalli Venkata <pradeepvenk...@pradeeps-mbp.lan> --- docs/pluggable_streams.rst | 56 ++++++++++++++++++++++ ...aConfluentSchemaRegistryAvroMessageDecoder.java | 49 ++++++++++++++++++- 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/docs/pluggable_streams.rst b/docs/pluggable_streams.rst index 8cc231f..309c840 100644 --- a/docs/pluggable_streams.rst +++ b/docs/pluggable_streams.rst @@ -268,6 +268,62 @@ confluent schema registry: } } +Here is another example which uses SSL based authentication to talk with kafka +and schema-registry. Notice there are two sets of SSL options, ones starting with +`ssl.` are for kafka consumer and ones with `stream.kafka.decoder.prop.schema.registry.` +are for `SchemaRegistryClient` used by `KafkaConfluentSchemaRegistryAvroMessageDecoder`. + + +.. code-block:: none + + { + "tableName": "meetupRsvp", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "mtime", + "timeType": "MILLISECONDS", + "segmentPushType": "APPEND", + "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", + "schemaName": "meetupRsvp", + "replication": "1", + "replicasPerPartition": "1" + }, + "tenants": {}, + "tableIndexConfig": { + "loadMode": "MMAP", + "streamConfigs": { + "streamType": "kafka", + "stream.kafka.consumer.type": "LowLevel", + "stream.kafka.topic.name": "meetupRSVPEvents", + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", + "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", + "stream.kafka.zk.broker.url": "localhost:2191/kafka", + "stream.kafka.broker.list": "localhost:19092", + + "schema.registry.url": "", + "security.protocol": "", + "ssl.truststore.location": "", + "ssl.keystore.location": "", + "ssl.truststore.password": "", + "ssl.keystore.password": "", + "ssl.key.password": "", + + "stream.kafka.decoder.prop.schema.registry.rest.url": "", + "stream.kafka.decoder.prop.schema.registry.ssl.truststore.location": "", + "stream.kafka.decoder.prop.schema.registry.ssl.keystore.location": "", + "stream.kafka.decoder.prop.schema.registry.ssl.truststore.password": "", + "stream.kafka.decoder.prop.schema.registry.ssl.keystore.password": "", + "stream.kafka.decoder.prop.schema.registry.ssl.keystore.type": "", + "stream.kafka.decoder.prop.schema.registry.ssl.truststore.type": "", + "stream.kafka.decoder.prop.schema.registry.ssl.key.password": "", + "stream.kafka.decoder.prop.schema.registry.ssl.protocol": "", + } + }, + "metadata": { + "customConfigs": {} + } + } + Upgrade from Kafka 0.9 connector to Kafka 2.x connector ------------------------------------------------------- diff --git a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java index 4e61d9a..1f0f442 100644 --- a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java +++ b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java @@ -21,11 +21,21 @@ package org.apache.pinot.plugin.inputformat.avro.confluent; import com.google.common.base.Preconditions; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.RestService; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; + import org.apache.avro.generic.GenericData.Record; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordExtractor; @@ -42,16 +52,53 @@ import static com.google.common.base.Preconditions.checkState; */ public class KafkaConfluentSchemaRegistryAvroMessageDecoder implements StreamMessageDecoder<byte[]> { private static final String SCHEMA_REGISTRY_REST_URL = "schema.registry.rest.url"; + private static final String SCHEMA_REGISTRY_OPTS_PREFIX = "schema.registry."; private KafkaAvroDeserializer _deserializer; private RecordExtractor<Record> _avroRecordExtractor; private String _topicName; + public RestService createRestService(String schemaRegistryUrl, Map<String, String> configs) { + RestService restService = new RestService(schemaRegistryUrl); + + + ConfigDef configDef = new ConfigDef(); + SslConfigs.addClientSslSupport(configDef); + Map<String, ConfigDef.ConfigKey> configKeyMap = configDef.configKeys(); + Map<String, Object> sslConfigs = new HashMap<>(); + for (String key : configs.keySet()) { + if (!key.equals(SCHEMA_REGISTRY_REST_URL) && key.startsWith(SCHEMA_REGISTRY_OPTS_PREFIX)) { + String value = configs.get(key); + key = key.substring(SCHEMA_REGISTRY_OPTS_PREFIX.length()); + + if (configKeyMap.containsKey(key)) { + if (configKeyMap.get(key).type == ConfigDef.Type.PASSWORD) { + sslConfigs.put(key, new Password(value)); + } else { + sslConfigs.put(key, value); + } + } + } + } + + + if (!sslConfigs.isEmpty()) { + SslFactory sslFactory = new SslFactory(Mode.CLIENT); + sslFactory.configure(sslConfigs); + restService.setSslSocketFactory(sslFactory.sslContext().getSocketFactory()); + } + return restService; + } + @Override public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName) throws Exception { checkState(props.containsKey(SCHEMA_REGISTRY_REST_URL), "Missing required property '%s'", SCHEMA_REGISTRY_REST_URL); String schemaRegistryUrl = props.get(SCHEMA_REGISTRY_REST_URL); - SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000); + SchemaRegistryClient schemaRegistryClient = + new CachedSchemaRegistryClient( + createRestService(schemaRegistryUrl, props), + 1000); + _deserializer = new KafkaAvroDeserializer(schemaRegistryClient); Preconditions.checkNotNull(topicName, "Topic must be provided"); _topicName = topicName; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org