This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
commit 5b0c70a46c280a17d8b775a0d35e5e3a00b6a8f2 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Oct 10 13:46:33 2023 +0200 Added an example of Apicurio Registry Usage with Kafka through YAML Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- jbang/kafka-apicurio-schema-registry/README.md | 81 +++ .../application.properties | 2 + .../kafka-apicurio-schema-registry/kafka-log.yaml | 42 ++ .../kafka-producer/pom.xml | 78 +++ .../kafka-producer/src/main/avro/order.avsc | 33 + .../main/java/com/acme/example/kafka/Produce.java | 70 +++ .../java/com/acme/example/kafka/models/Order.java | 673 +++++++++++++++++++++ 7 files changed, 979 insertions(+) diff --git a/jbang/kafka-apicurio-schema-registry/README.md b/jbang/kafka-apicurio-schema-registry/README.md new file mode 100644 index 0000000..5e60ee0 --- /dev/null +++ b/jbang/kafka-apicurio-schema-registry/README.md @@ -0,0 +1,81 @@ +# Example for consuming from Kafka with the usage of Apicurio Schema Registry and Avro + +You'll need a running Kafka instance and an Apicurio Registry + +## Kafka instance + +You could use a plain Kafka archive or use an Ansible role + +## Apicurio Registry + +```bash +docker run -it -p 8080:8080 apicurio/apicurio-registry-mem:2.4.12.Final +``` + +This will run an Apicurio Registry instance with in-memory persistence. + +## Configure the applications + +In `application.properties` set the Kafka instance address and the Apicurio schema registry URL. + +## Produce to Kafka. + +Run [`Produce.java`](./kafka-producer/src/main/java/com/acme/example/kafka/Produce.java) to produce a message to the Kafka. + +```bash +mvn compile exec:java -Dexec.mainClass="com.acme.example.kafka.Produce" +``` + +## Consume from Kafka. + +To consume messages using a Camel route, first install the azure identity maven project: +```bash +cd kafka-producer +mvn clean install +``` +then run: +```bash +camel run kafka-log.yaml +``` + +You should see something like + +```bash +2023-10-10 13:44:09.810 INFO 71770 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (started:2) +2023-10-10 13:44:09.810 INFO 71770 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-to-log (kafka://my-topic) +2023-10-10 13:44:09.810 INFO 71770 --- [ main] el.impl.engine.AbstractCamelContext : Started log-sink-1 (kamelet://source) +2023-10-10 13:44:09.810 INFO 71770 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.0.1 (kafka-log) started in 187ms (build:0ms init:0ms start:187ms) +2023-10-10 13:44:10.018 WARN 71770 --- [sumer[my-topic]] fka.clients.consumer.ConsumerConfig : These configurations '[apicurio.registry.avroDatumProvider, apicurio.registry.url]' were supplied but are not used yet. +2023-10-10 13:44:10.019 INFO 71770 --- [sumer[my-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.4.0 +2023-10-10 13:44:10.019 INFO 71770 --- [sumer[my-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 2e1947d240607d53 +2023-10-10 13:44:10.019 INFO 71770 --- [sumer[my-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1696938250018 +2023-10-10 13:44:10.023 INFO 71770 --- [sumer[my-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy +2023-10-10 13:44:10.023 INFO 71770 --- [sumer[my-topic]] l.component.kafka.KafkaFetchRecords : Subscribing my-topic-Thread 0 to topic my-topic +2023-10-10 13:44:10.024 INFO 71770 --- [sumer[my-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Subscribed to topic(s): my-topic +2023-10-10 13:44:10.254 INFO 71770 --- [sumer[my-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Resetting the last seen epoch of partition my-topic-0 to 0 since the associated topicId changed from null to PP5gKKwZTTOwYYvKftvhgA +2023-10-10 13:44:10.256 INFO 71770 --- [sumer[my-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Cluster ID: LGe3ByI8SLSis9Sm9zcCVg +2023-10-10 13:44:10.257 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null) +2023-10-10 13:44:10.263 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] (Re-)joining group +2023-10-10 13:44:10.276 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Request joining group due to: need to re-join with the given member-id: consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033 +2023-10-10 13:44:10.278 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) +2023-10-10 13:44:10.278 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] (Re-)joining group +2023-10-10 13:44:10.283 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Successfully joined group with generation Generation{generationId=19, memberId='consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033', protocol='range'} +2023-10-10 13:44:10.285 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Finished assignment for group at generation 19: {consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033=Assignment(partitions=[my-topic-0])} +2023-10-10 13:44:10.292 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Successfully synced group in generation Generation{generationId=19, memberId='consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033', protocol='range'} +2023-10-10 13:44:10.294 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Notifying assignor about the new Assignment(partitions=[my-topic-0]) +2023-10-10 13:44:10.298 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Adding newly assigned partitions: my-topic-0 +2023-10-10 13:44:10.314 INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Setting offset for partition my-topic-0 to the committed offset FetchPosition{offset=11, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}} +``` + +and after a message has been produced to Kafka you should see + +```bash +2023-10-10 13:44:10.519 INFO 71770 --- [sumer[my-topic]] log-sink : Exchange[ + ExchangePattern: InOnly + Headers: {apicurio.value.encoding=[B@474baada, apicurio.value.globalId=[B@28e32105, CamelMessageTimestamp=1696938203819, kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = apicurio.value.globalId, value = [0, 0, 0, 0, 0, 0, 0, 3]), RecordHeader(key = apicurio.value.encoding, value = [66, 73, 78, 65, 82, 89])], isReadOnly = false), kafka.KEY=key, kafka.OFFSET=11, kafka.PARTITION=0, kafka.TIMESTAMP=1696938203819, kafka.TOPIC=my-topic} + BodyType: org.apache.avro.generic.GenericData.Record + Body: {"orderId": 1, "itemId": "item", "userId": "user", "quantity": 3.0, "description": "A really nice item I do love"} +] +``` + + diff --git a/jbang/kafka-apicurio-schema-registry/application.properties b/jbang/kafka-apicurio-schema-registry/application.properties new file mode 100644 index 0000000..d491e63 --- /dev/null +++ b/jbang/kafka-apicurio-schema-registry/application.properties @@ -0,0 +1,2 @@ +bootstrap.servers=localhost:9092 +topic=my-topic diff --git a/jbang/kafka-apicurio-schema-registry/kafka-log.yaml b/jbang/kafka-apicurio-schema-registry/kafka-log.yaml new file mode 100644 index 0000000..9ca2187 --- /dev/null +++ b/jbang/kafka-apicurio-schema-registry/kafka-log.yaml @@ -0,0 +1,42 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# camel-k: dependency=mvn:com.acme.example:kafka-apicurio-producer:0.1 +# camel-k: dependency=mvn:io.apicurio:apicurio-registry-serdes-avro-serde:2.4.12.Final + +- beans: + - name: order + type: "#class:com.acme.example.kafka.models.Order" + +- route: + id: "kafka-to-log" + from: + uri: "kafka:{{topic}}" + parameters: + autoOffsetReset: earliest + brokers: "{{bootstrap.servers}}" + groupId: 'my-consumer-group' + valueDeserializer: 'io.apicurio.registry.serde.avro.AvroKafkaDeserializer' + additionalProperties.apicurio.registry.url: 'http://localhost:8080/apis/registry/v2' + additionalProperties.apicurio.registry.avro-datum-provider: 'io.apicurio.registry.serde.avro.ReflectAvroDatumProvider' + steps: + - to: + uri: "kamelet:log-sink" + parameters: + showStreams: true + showHeaders: true + multiline: true diff --git a/jbang/kafka-apicurio-schema-registry/kafka-producer/pom.xml b/jbang/kafka-apicurio-schema-registry/kafka-producer/pom.xml new file mode 100644 index 0000000..fb16f3d --- /dev/null +++ b/jbang/kafka-apicurio-schema-registry/kafka-producer/pom.xml @@ -0,0 +1,78 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>com.acme.example</groupId> + <artifactId>kafka-apicurio-producer</artifactId> + <packaging>jar</packaging> + <version>0.1</version> + <name>kafka-apicurio-prod</name> + <url>http://maven.apache.org</url> + <dependencies> + + <dependency> + <groupId>io.apicurio</groupId> + <artifactId>apicurio-registry-serdes-avro-serde</artifactId> + <version>2.4.12.Final</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>3.4.0</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.11.3</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>2.0.7</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>2.0.7</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.11.0</version> + <configuration> + <source>17</source> + <target>17</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>1.11.3</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>src/main/avro</sourceDirectory> + <outputDirectory>${project.basedir}/src/main/java</outputDirectory> + <stringType>String</stringType> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/avro/order.avsc b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/avro/order.avsc new file mode 100644 index 0000000..e41753e --- /dev/null +++ b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/avro/order.avsc @@ -0,0 +1,33 @@ +{ + "doc": "Fact schema of an order", + "fields": [ + { + "doc": "Unique id of the order.", + "name": "orderId", + "type": "int" + }, + { + "doc": "Id of the ordered item.", + "name": "itemId", + "type": "string" + }, + { + "doc": "Id of the user who ordered the item.", + "name": "userId", + "type": "string" + }, + { + "doc": "How much was ordered.", + "name": "quantity", + "type": "double" + }, + { + "doc": "Description of item.", + "name": "description", + "type": "string" + } + ], + "name": "Order", + "namespace": "com.acme.example.kafka.models", + "type": "record" +} diff --git a/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java new file mode 100644 index 0000000..856c903 --- /dev/null +++ b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.acme.example.kafka; + +import com.acme.example.kafka.models.Order; +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.avro.AvroKafkaSerializer; +import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import io.apicurio.registry.serde.avro.ReflectAvroDatumProvider; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class Produce { + + private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2"; + public static final String DEFAULT_PROPERTIES_PATH = "../application.properties"; + + public static void main(String[] args) throws IOException { + String propertiesPath = DEFAULT_PROPERTIES_PATH; + if (args.length >= 1) { + propertiesPath = args[0]; + } + + Properties properties = new Properties(); + properties.load(Files.newInputStream(Paths.get(propertiesPath))); + + properties.put(SerdeConfig.REGISTRY_URL, REGISTRY_URL); + properties.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class); + + properties.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName()); + + try (KafkaProducer<String, Order> orderProducer = new KafkaProducer<>(properties)) { + Order order = new Order(1, "item", "user", 3.0, "A really nice item I do love"); + String topic = properties.getProperty("topic"); + ProducerRecord<String, Order> record = new ProducerRecord<>(topic, "key", order); + RecordMetadata result = orderProducer.send(record).get(5, TimeUnit.SECONDS); + System.out.println("Sent record with offset " + result.offset()); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + e.printStackTrace(); + System.exit(1); + } + } +} diff --git a/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java new file mode 100644 index 0000000..c559447 --- /dev/null +++ b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java @@ -0,0 +1,673 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package com.acme.example.kafka.models; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +/** Fact schema of an order */ +@org.apache.avro.specific.AvroGenerated +public class Order extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = -8676937297320921983L; + + + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.acme.example.kafka.models\",\"doc\":\"Fact schema of an order\",\"fields\":[{\"name\":\"orderId\",\"type\":\"int\",\"doc\":\"Unique id of the order.\"},{\"name\":\"itemId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Id of the ordered item.\"},{\"name\":\"userId\",\"type\":{\"type\":\"string\",\"avro.jav [...] + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static final SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder<Order> ENCODER = + new BinaryMessageEncoder<>(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder<Order> DECODER = + new BinaryMessageDecoder<>(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder<Order> getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder<Order> getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder<Order> createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this Order to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a Order from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a Order instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static Order fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + /** Unique id of the order. */ + private int orderId; + /** Id of the ordered item. */ + private java.lang.String itemId; + /** Id of the user who ordered the item. */ + private java.lang.String userId; + /** How much was ordered. */ + private double quantity; + /** Description of item. */ + private java.lang.String description; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use <code>newBuilder()</code>. + */ + public Order() {} + + /** + * All-args constructor. + * @param orderId Unique id of the order. + * @param itemId Id of the ordered item. + * @param userId Id of the user who ordered the item. + * @param quantity How much was ordered. + * @param description Description of item. + */ + public Order(java.lang.Integer orderId, java.lang.String itemId, java.lang.String userId, java.lang.Double quantity, java.lang.String description) { + this.orderId = orderId; + this.itemId = itemId; + this.userId = userId; + this.quantity = quantity; + this.description = description; + } + + @Override + public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } + + @Override + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + + // Used by DatumWriter. Applications should not call. + @Override + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return orderId; + case 1: return itemId; + case 2: return userId; + case 3: return quantity; + case 4: return description; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @Override + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: orderId = (java.lang.Integer)value$; break; + case 1: itemId = value$ != null ? value$.toString() : null; break; + case 2: userId = value$ != null ? value$.toString() : null; break; + case 3: quantity = (java.lang.Double)value$; break; + case 4: description = value$ != null ? value$.toString() : null; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'orderId' field. + * @return Unique id of the order. + */ + public int getOrderId() { + return orderId; + } + + + /** + * Sets the value of the 'orderId' field. + * Unique id of the order. + * @param value the value to set. + */ + public void setOrderId(int value) { + this.orderId = value; + } + + /** + * Gets the value of the 'itemId' field. + * @return Id of the ordered item. + */ + public java.lang.String getItemId() { + return itemId; + } + + + /** + * Sets the value of the 'itemId' field. + * Id of the ordered item. + * @param value the value to set. + */ + public void setItemId(java.lang.String value) { + this.itemId = value; + } + + /** + * Gets the value of the 'userId' field. + * @return Id of the user who ordered the item. + */ + public java.lang.String getUserId() { + return userId; + } + + + /** + * Sets the value of the 'userId' field. + * Id of the user who ordered the item. + * @param value the value to set. + */ + public void setUserId(java.lang.String value) { + this.userId = value; + } + + /** + * Gets the value of the 'quantity' field. + * @return How much was ordered. + */ + public double getQuantity() { + return quantity; + } + + + /** + * Sets the value of the 'quantity' field. + * How much was ordered. + * @param value the value to set. + */ + public void setQuantity(double value) { + this.quantity = value; + } + + /** + * Gets the value of the 'description' field. + * @return Description of item. + */ + public java.lang.String getDescription() { + return description; + } + + + /** + * Sets the value of the 'description' field. + * Description of item. + * @param value the value to set. + */ + public void setDescription(java.lang.String value) { + this.description = value; + } + + /** + * Creates a new Order RecordBuilder. + * @return A new Order RecordBuilder + */ + public static com.acme.example.kafka.models.Order.Builder newBuilder() { + return new com.acme.example.kafka.models.Order.Builder(); + } + + /** + * Creates a new Order RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new Order RecordBuilder + */ + public static com.acme.example.kafka.models.Order.Builder newBuilder(com.acme.example.kafka.models.Order.Builder other) { + if (other == null) { + return new com.acme.example.kafka.models.Order.Builder(); + } else { + return new com.acme.example.kafka.models.Order.Builder(other); + } + } + + /** + * Creates a new Order RecordBuilder by copying an existing Order instance. + * @param other The existing instance to copy. + * @return A new Order RecordBuilder + */ + public static com.acme.example.kafka.models.Order.Builder newBuilder(com.acme.example.kafka.models.Order other) { + if (other == null) { + return new com.acme.example.kafka.models.Order.Builder(); + } else { + return new com.acme.example.kafka.models.Order.Builder(other); + } + } + + /** + * RecordBuilder for Order instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Order> + implements org.apache.avro.data.RecordBuilder<Order> { + + /** Unique id of the order. */ + private int orderId; + /** Id of the ordered item. */ + private java.lang.String itemId; + /** Id of the user who ordered the item. */ + private java.lang.String userId; + /** How much was ordered. */ + private double quantity; + /** Description of item. */ + private java.lang.String description; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$, MODEL$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(com.acme.example.kafka.models.Order.Builder other) { + super(other); + if (isValidValue(fields()[0], other.orderId)) { + this.orderId = data().deepCopy(fields()[0].schema(), other.orderId); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.itemId)) { + this.itemId = data().deepCopy(fields()[1].schema(), other.itemId); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + if (isValidValue(fields()[2], other.userId)) { + this.userId = data().deepCopy(fields()[2].schema(), other.userId); + fieldSetFlags()[2] = other.fieldSetFlags()[2]; + } + if (isValidValue(fields()[3], other.quantity)) { + this.quantity = data().deepCopy(fields()[3].schema(), other.quantity); + fieldSetFlags()[3] = other.fieldSetFlags()[3]; + } + if (isValidValue(fields()[4], other.description)) { + this.description = data().deepCopy(fields()[4].schema(), other.description); + fieldSetFlags()[4] = other.fieldSetFlags()[4]; + } + } + + /** + * Creates a Builder by copying an existing Order instance + * @param other The existing instance to copy. + */ + private Builder(com.acme.example.kafka.models.Order other) { + super(SCHEMA$, MODEL$); + if (isValidValue(fields()[0], other.orderId)) { + this.orderId = data().deepCopy(fields()[0].schema(), other.orderId); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.itemId)) { + this.itemId = data().deepCopy(fields()[1].schema(), other.itemId); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.userId)) { + this.userId = data().deepCopy(fields()[2].schema(), other.userId); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.quantity)) { + this.quantity = data().deepCopy(fields()[3].schema(), other.quantity); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.description)) { + this.description = data().deepCopy(fields()[4].schema(), other.description); + fieldSetFlags()[4] = true; + } + } + + /** + * Gets the value of the 'orderId' field. + * Unique id of the order. + * @return The value. + */ + public int getOrderId() { + return orderId; + } + + + /** + * Sets the value of the 'orderId' field. + * Unique id of the order. + * @param value The value of 'orderId'. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder setOrderId(int value) { + validate(fields()[0], value); + this.orderId = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'orderId' field has been set. + * Unique id of the order. + * @return True if the 'orderId' field has been set, false otherwise. + */ + public boolean hasOrderId() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'orderId' field. + * Unique id of the order. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder clearOrderId() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'itemId' field. + * Id of the ordered item. + * @return The value. + */ + public java.lang.String getItemId() { + return itemId; + } + + + /** + * Sets the value of the 'itemId' field. + * Id of the ordered item. + * @param value The value of 'itemId'. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder setItemId(java.lang.String value) { + validate(fields()[1], value); + this.itemId = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'itemId' field has been set. + * Id of the ordered item. + * @return True if the 'itemId' field has been set, false otherwise. + */ + public boolean hasItemId() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'itemId' field. + * Id of the ordered item. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder clearItemId() { + itemId = null; + fieldSetFlags()[1] = false; + return this; + } + + /** + * Gets the value of the 'userId' field. + * Id of the user who ordered the item. + * @return The value. + */ + public java.lang.String getUserId() { + return userId; + } + + + /** + * Sets the value of the 'userId' field. + * Id of the user who ordered the item. + * @param value The value of 'userId'. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder setUserId(java.lang.String value) { + validate(fields()[2], value); + this.userId = value; + fieldSetFlags()[2] = true; + return this; + } + + /** + * Checks whether the 'userId' field has been set. + * Id of the user who ordered the item. + * @return True if the 'userId' field has been set, false otherwise. + */ + public boolean hasUserId() { + return fieldSetFlags()[2]; + } + + + /** + * Clears the value of the 'userId' field. + * Id of the user who ordered the item. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder clearUserId() { + userId = null; + fieldSetFlags()[2] = false; + return this; + } + + /** + * Gets the value of the 'quantity' field. + * How much was ordered. + * @return The value. + */ + public double getQuantity() { + return quantity; + } + + + /** + * Sets the value of the 'quantity' field. + * How much was ordered. + * @param value The value of 'quantity'. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder setQuantity(double value) { + validate(fields()[3], value); + this.quantity = value; + fieldSetFlags()[3] = true; + return this; + } + + /** + * Checks whether the 'quantity' field has been set. + * How much was ordered. + * @return True if the 'quantity' field has been set, false otherwise. + */ + public boolean hasQuantity() { + return fieldSetFlags()[3]; + } + + + /** + * Clears the value of the 'quantity' field. + * How much was ordered. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder clearQuantity() { + fieldSetFlags()[3] = false; + return this; + } + + /** + * Gets the value of the 'description' field. + * Description of item. + * @return The value. + */ + public java.lang.String getDescription() { + return description; + } + + + /** + * Sets the value of the 'description' field. + * Description of item. + * @param value The value of 'description'. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder setDescription(java.lang.String value) { + validate(fields()[4], value); + this.description = value; + fieldSetFlags()[4] = true; + return this; + } + + /** + * Checks whether the 'description' field has been set. + * Description of item. + * @return True if the 'description' field has been set, false otherwise. + */ + public boolean hasDescription() { + return fieldSetFlags()[4]; + } + + + /** + * Clears the value of the 'description' field. + * Description of item. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder clearDescription() { + description = null; + fieldSetFlags()[4] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public Order build() { + try { + Order record = new Order(); + record.orderId = fieldSetFlags()[0] ? this.orderId : (java.lang.Integer) defaultValue(fields()[0]); + record.itemId = fieldSetFlags()[1] ? this.itemId : (java.lang.String) defaultValue(fields()[1]); + record.userId = fieldSetFlags()[2] ? this.userId : (java.lang.String) defaultValue(fields()[2]); + record.quantity = fieldSetFlags()[3] ? this.quantity : (java.lang.Double) defaultValue(fields()[3]); + record.description = fieldSetFlags()[4] ? this.description : (java.lang.String) defaultValue(fields()[4]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter<Order> + WRITER$ = (org.apache.avro.io.DatumWriter<Order>)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader<Order> + READER$ = (org.apache.avro.io.DatumReader<Order>)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override protected boolean hasCustomCoders() { return true; } + + @Override public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException + { + out.writeInt(this.orderId); + + out.writeString(this.itemId); + + out.writeString(this.userId); + + out.writeDouble(this.quantity); + + out.writeString(this.description); + + } + + @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException + { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.orderId = in.readInt(); + + this.itemId = in.readString(); + + this.userId = in.readString(); + + this.quantity = in.readDouble(); + + this.description = in.readString(); + + } else { + for (int i = 0; i < 5; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.orderId = in.readInt(); + break; + + case 1: + this.itemId = in.readString(); + break; + + case 2: + this.userId = in.readString(); + break; + + case 3: + this.quantity = in.readDouble(); + break; + + case 4: + this.description = in.readString(); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} + + + + + + + + + +