This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 42a2fc70ff Add Support for Schema Registry in Protobuf Decoder (#9220) 42a2fc70ff is described below commit 42a2fc70ff37fddaaba300becd36b41f170e77bd Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Thu Aug 18 12:17:59 2022 +0530 Add Support for Schema Registry in Protobuf Decoder (#9220) * working on schema registry. refactor pending * Add tests for confluent protobuf and delete duplicate files * move dependency versions to parent pom * Fix protobuf decoder bug: Not honouring offset and length Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../v0_deprecated/pinot-ingestion-common/pom.xml | 6 + .../pinot-confluent-avro/pom.xml | 1 - .../pinot-input-format/pinot-protobuf/pom.xml | 97 +++++++++++++- ...fluentSchemaRegistryProtoBufMessageDecoder.java | 143 +++++++++++++++++++++ .../protobuf/ProtoBufMessageDecoder.java | 11 +- .../protobuf/ProtoBufConfluentSchemaTest.java | 130 +++++++++++++++++++ .../schemaregistry/SchemaRegistryStarter.java | 105 +++++++++++++++ pom.xml | 1 + 8 files changed, 486 insertions(+), 8 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml index 8fb34e6fe3..4d0f323e2b 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml @@ -33,6 +33,12 @@ <artifactId>pinot-ingestion-common</artifactId> <name>Pinot Ingestion Common</name> <url>https://pinot.apache.org/</url> + <repositories> + <repository> + <id>confluent</id> + <url>https://packages.confluent.io/maven/</url> + </repository> + </repositories> <properties> <pinot.root>${basedir}/../../../..</pinot.root> <phase.prop>package</phase.prop> diff --git a/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml b/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml index 57d46b0d5b..e1e4f87b8a 100644 --- a/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml +++ b/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml @@ -35,7 +35,6 @@ <properties> <pinot.root>${basedir}/../../..</pinot.root> <kafka.lib.version>2.8.1</kafka.lib.version> - <confluent.version>5.3.1</confluent.version> <phase.prop>package</phase.prop> </properties> <repositories> diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml index d3befbbc91..c1e25bb9b6 100644 --- a/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml @@ -35,9 +35,16 @@ <url>https://pinot.apache.org/</url> <properties> <pinot.root>${basedir}/../../..</pinot.root> - <proto.version>3.11.4</proto.version> + <kafka.lib.version>2.8.1</kafka.lib.version> + <testcontainers.version>1.17.3</testcontainers.version> <phase.prop>package</phase.prop> </properties> + <repositories> + <repository> + <id>confluent</id> + <url>https://packages.confluent.io/maven/</url> + </repository> + </repositories> <dependencies> <dependency> <groupId>commons-lang</groupId> @@ -46,7 +53,6 @@ <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> - <version>${proto.version}</version> </dependency> <dependency> <groupId>com.github.os72</groupId> @@ -59,6 +65,93 @@ </exclusion> </exclusions> </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.lib.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>io.confluent</groupId> + <artifactId>kafka-schema-registry-client</artifactId> + <version>${confluent.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + <exclusion> + <groupId>io.swagger</groupId> + <artifactId>swagger-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>io.confluent</groupId> + <artifactId>kafka-protobuf-serializer</artifactId> + <version>${confluent.version}</version> + <exclusions> + <exclusion> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-common</artifactId> + </exclusion> + <exclusion> + <groupId>com.squareup.okio</groupId> + <artifactId>okio</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.errorprone</groupId> + <artifactId>error_prone_annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/KafkaConfluentSchemaRegistryProtoBufMessageDecoder.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/KafkaConfluentSchemaRegistryProtoBufMessageDecoder.java new file mode 100644 index 0000000000..5556e6a97e --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/KafkaConfluentSchemaRegistryProtoBufMessageDecoder.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.protobuf; + +import com.google.common.base.Preconditions; +import com.google.protobuf.Message; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.RestService; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordExtractor; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkState; + + +public class KafkaConfluentSchemaRegistryProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> { + private static final Logger LOGGER = + LoggerFactory.getLogger(KafkaConfluentSchemaRegistryProtoBufMessageDecoder.class); + private static final String SCHEMA_REGISTRY_REST_URL = "schema.registry.rest.url"; + private static final String SCHEMA_REGISTRY_OPTS_PREFIX = "schema.registry."; + public static final String CACHED_SCHEMA_MAP_CAPACITY = "cached.schema.map.capacity"; + public static final String DEFAULT_CACHED_SCHEMA_MAP_CAPACITY = "1000"; + + private KafkaProtobufDeserializer<Message> _deserializer; + private RecordExtractor<Message> _protoBufRecordExtractor; + private String _topicName; + + private RestService createRestService(String schemaRegistryUrl, Map<String, String> configs) { + RestService restService = new RestService(schemaRegistryUrl); + + ConfigDef configDef = new ConfigDef(); + SslConfigs.addClientSslSupport(configDef); + Map<String, ConfigDef.ConfigKey> configKeyMap = configDef.configKeys(); + Map<String, Object> sslConfigs = new HashMap<>(); + for (String key : configs.keySet()) { + if (!key.equals(SCHEMA_REGISTRY_REST_URL) && key.startsWith(SCHEMA_REGISTRY_OPTS_PREFIX)) { + String value = configs.get(key); + String schemaRegistryOptKey = key.substring(SCHEMA_REGISTRY_OPTS_PREFIX.length()); + + if (configKeyMap.containsKey(schemaRegistryOptKey)) { + if (configKeyMap.get(schemaRegistryOptKey).type == ConfigDef.Type.PASSWORD) { + sslConfigs.put(schemaRegistryOptKey, new Password(value)); + } else { + sslConfigs.put(schemaRegistryOptKey, value); + } + } + } + } + + if (!sslConfigs.isEmpty()) { + DefaultSslEngineFactory sslFactory = new DefaultSslEngineFactory(); + sslFactory.configure(sslConfigs); + restService.setSslSocketFactory(sslFactory.sslContext().getSocketFactory()); + } + return restService; + } + + @Override + public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName) + throws Exception { + checkState(props.containsKey(SCHEMA_REGISTRY_REST_URL), "Missing required property '%s'", SCHEMA_REGISTRY_REST_URL); + String schemaRegistryUrl = props.get(SCHEMA_REGISTRY_REST_URL); + ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider(); + int identityMapCapacity = Integer.parseInt( + props.getOrDefault(CACHED_SCHEMA_MAP_CAPACITY, DEFAULT_CACHED_SCHEMA_MAP_CAPACITY)); + SchemaRegistryClient schemaRegistryClient = + new CachedSchemaRegistryClient(createRestService(schemaRegistryUrl, props), + identityMapCapacity, Collections.singletonList(protobufSchemaProvider), props, null); + _deserializer = new KafkaProtobufDeserializer<>(schemaRegistryClient); + Preconditions.checkNotNull(topicName, "Topic must be provided"); + _topicName = topicName; + _protoBufRecordExtractor = PluginManager.get().createInstance(ProtoBufRecordExtractor.class.getName()); + _protoBufRecordExtractor.init(fieldsToRead, null); + } + + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + try { + Message protoMessage = _deserializer.deserialize(_topicName, payload); + return _protoBufRecordExtractor.extract(protoMessage, destination); + } catch (RuntimeException e) { + ignoreOrRethrowException(e); + return null; + } + } + + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); + } + + /** + * This method handles specific serialisation exceptions. If the exception cannot be ignored the method + * re-throws the exception. + * + * @param e exception to handle + */ + private void ignoreOrRethrowException(RuntimeException e) { + if (isUnknownMagicByte(e) || isUnknownMagicByte(e.getCause())) { + // Do nothing, the message is not an ProtoBuf message and can't be decoded + LOGGER.error("Caught exception while decoding row in topic {}, discarding row", _topicName, e); + return; + } + throw e; + } + + private boolean isUnknownMagicByte(Throwable e) { + return e != null && e instanceof SerializationException && e.getMessage() != null && e.getMessage().toLowerCase() + .contains("unknown magic byte"); + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java index db1995a6b2..fb0c5b3e8d 100644 --- a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java @@ -25,6 +25,7 @@ import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; @@ -78,11 +79,6 @@ public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> { @Override public GenericRow decode(byte[] payload, GenericRow destination) { - return decode(payload, 0, payload.length, destination); - } - - @Override - public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { Message message; try { _builder.mergeFrom(payload); @@ -96,4 +92,9 @@ public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> { _recordExtractor.extract(message, destination); return destination; } + + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); + } } diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.java new file mode 100644 index 0000000000..ca0270ada4 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.protobuf; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pinot.plugin.inputformat.protobuf.kafka.schemaregistry.SchemaRegistryStarter; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class ProtoBufConfluentSchemaTest { + public static final String TOPIC_PROTO = "test_topic_proto"; + SchemaRegistryStarter.KafkaSchemaRegistryInstance _schemaRegistry; + private Producer<byte[], Message> _protoProducer; + + @BeforeClass + public void setup() { + _schemaRegistry = SchemaRegistryStarter.startLocalInstance(9093); + + Properties protoBufProducerProps = new Properties(); + protoBufProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + _schemaRegistry._kafkaContainer.getBootstrapServers()); + protoBufProducerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, _schemaRegistry.getUrl()); + protoBufProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + protoBufProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer"); + _protoProducer = new KafkaProducer<>(protoBufProducerProps); + } + + @Test + public void testSamplePinotConsumer() + throws Exception { + int numRecords = 10; + List<Sample.SampleRecord> recordList = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Sample.SampleRecord sampleRecord = Sample.SampleRecord.newBuilder().addFriends(UUID.randomUUID().toString()) + .addFriends(UUID.randomUUID().toString()).setEmail(UUID.randomUUID().toString()) + .setName(UUID.randomUUID().toString()).setId(i).build(); + + _protoProducer.send(new ProducerRecord<>(TOPIC_PROTO, sampleRecord)); + recordList.add(sampleRecord); + } + + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _schemaRegistry._kafkaContainer.getBootstrapServers()); + consumerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, _schemaRegistry.getUrl()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "foo_bar"); + KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerProps); + kafkaConsumer.subscribe(Collections.singletonList(TOPIC_PROTO)); + ConsumerRecords<byte[], byte[]> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); + Iterator<ConsumerRecord<byte[], byte[]>> iter = consumerRecords.iterator(); + + KafkaConfluentSchemaRegistryProtoBufMessageDecoder decoder = + new KafkaConfluentSchemaRegistryProtoBufMessageDecoder(); + Map<String, String> decoderProps = new HashMap<>(); + decoderProps.put("schema.registry.rest.url", _schemaRegistry.getUrl()); + decoder.init(decoderProps, null, TOPIC_PROTO); + GenericRow reuse = new GenericRow(); + List<GenericRow> result = new ArrayList<>(); + while (iter.hasNext()) { + byte[] arr = iter.next().value(); + decoder.decode(arr, reuse); + result.add(reuse.copy()); + reuse.clear(); + } + + Assert.assertEquals(result.size(), numRecords); + + for (int i = 0; i < numRecords; i++) { + Sample.SampleRecord originalValue = recordList.get(i); + GenericRow decodedValue = result.get(i); + + for (Map.Entry<Descriptors.FieldDescriptor, Object> fieldWithValue : originalValue.getAllFields().entrySet()) { + Assert.assertNotNull(decodedValue.getValue(fieldWithValue.getKey().getName())); + if (!fieldWithValue.getKey().isRepeated()) { + Assert.assertEquals(fieldWithValue.getValue(), decodedValue.getValue(fieldWithValue.getKey().getName())); + } + } + } + } + + @AfterClass + public void tearDown() { + _schemaRegistry.stop(); + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/kafka/schemaregistry/SchemaRegistryStarter.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/kafka/schemaregistry/SchemaRegistryStarter.java new file mode 100644 index 0000000000..c6d7a59770 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/kafka/schemaregistry/SchemaRegistryStarter.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.protobuf.kafka.schemaregistry; + +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + + +public class SchemaRegistryStarter { + public static final int DEFAULT_PORT = 8081; + private static final String CONFLUENT_PLATFORM_VERSION = "7.2.0"; + private static final DockerImageName KAFKA_DOCKER_IMAGE_NAME = + DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION); + private static final DockerImageName SCHEMA_REGISTRY_DOCKER_IMAGE_NAME = + DockerImageName.parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION); + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRegistryStarter.class); + + private SchemaRegistryStarter() { + } + + public static KafkaSchemaRegistryInstance startLocalInstance(int port) { + KafkaSchemaRegistryInstance kafkaSchemaRegistry = new KafkaSchemaRegistryInstance(port); + kafkaSchemaRegistry.start(); + return kafkaSchemaRegistry; + } + + public static class KafkaSchemaRegistryInstance { + private final int _port; + public KafkaContainer _kafkaContainer; + private Network _network; + private GenericContainer _schemaRegistryContainer; + + private KafkaSchemaRegistryInstance(int port) { + _port = port; + } + + public String getUrl() { + return "http://" + _schemaRegistryContainer.getHost() + ":" + _schemaRegistryContainer.getMappedPort(_port); + } + + public void start() { + LOGGER.info("Starting schema registry"); + if (_kafkaContainer != null || _schemaRegistryContainer != null) { + throw new IllegalStateException("Schema registry is already running"); + } + + _network = Network.newNetwork(); + + _kafkaContainer = new KafkaContainer(KAFKA_DOCKER_IMAGE_NAME).withNetwork(_network).withNetworkAliases("kafka") + .withCreateContainerCmdModifier(it -> it.withHostName("kafka")).waitingFor(Wait.forListeningPort()); + _kafkaContainer.start(); + + Map<String, String> schemaRegistryProps = new HashMap<>(); + schemaRegistryProps.put("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "kafka:9092"); + schemaRegistryProps.put("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry"); + schemaRegistryProps.put("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + _port); + schemaRegistryProps.put("SCHEMA_REGISTRY_DEBUG", "true"); + _schemaRegistryContainer = + new GenericContainer(SCHEMA_REGISTRY_DOCKER_IMAGE_NAME).dependsOn(_kafkaContainer).withNetwork(_network) + .withNetworkAliases("schemaregistry").withEnv(schemaRegistryProps).withExposedPorts(_port) + .waitingFor(Wait.forListeningPort()); + _schemaRegistryContainer.start(); + } + + public void stop() { + LOGGER.info("Stopping schema registry"); + if (_schemaRegistryContainer != null) { + _schemaRegistryContainer.stop(); + _schemaRegistryContainer = null; + } + + if (_kafkaContainer != null) { + _kafkaContainer.stop(); + _kafkaContainer = null; + } + + if (_network != null) { + _network.close(); + } + } + } +} diff --git a/pom.xml b/pom.xml index b61f381f90..5876c72893 100644 --- a/pom.xml +++ b/pom.xml @@ -168,6 +168,7 @@ <kafka.version>2.0</kafka.version> <protobuf.version>3.19.2</protobuf.version> <grpc.version>1.41.0</grpc.version> + <confluent.version>5.5.3</confluent.version> <!-- Checkstyle violation prop.--> <checkstyle.violation.severity>warning</checkstyle.violation.severity> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org