This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit 5b0c70a46c280a17d8b775a0d35e5e3a00b6a8f2
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Tue Oct 10 13:46:33 2023 +0200

    Added an example of Apicurio Registry Usage with Kafka through YAML
    
    Signed-off-by: Andrea Cosentino <anco...@gmail.com>
---
 jbang/kafka-apicurio-schema-registry/README.md     |  81 +++
 .../application.properties                         |   2 +
 .../kafka-apicurio-schema-registry/kafka-log.yaml  |  42 ++
 .../kafka-producer/pom.xml                         |  78 +++
 .../kafka-producer/src/main/avro/order.avsc        |  33 +
 .../main/java/com/acme/example/kafka/Produce.java  |  70 +++
 .../java/com/acme/example/kafka/models/Order.java  | 673 +++++++++++++++++++++
 7 files changed, 979 insertions(+)

diff --git a/jbang/kafka-apicurio-schema-registry/README.md 
b/jbang/kafka-apicurio-schema-registry/README.md
new file mode 100644
index 0000000..5e60ee0
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/README.md
@@ -0,0 +1,81 @@
+# Example for consuming from Kafka with the usage of Apicurio Schema Registry 
and Avro
+
+You'll need a running Kafka instance and an Apicurio Registry
+
+## Kafka instance
+
+You could use a plain Kafka archive or use an Ansible role
+
+## Apicurio Registry
+
+```bash
+docker run -it -p 8080:8080 apicurio/apicurio-registry-mem:2.4.12.Final
+```
+
+This will run an Apicurio Registry instance with in-memory persistence.
+
+## Configure the applications
+
+In `application.properties` set the Kafka instance address and the Apicurio 
schema registry URL.
+
+## Produce to Kafka.
+
+Run 
[`Produce.java`](./kafka-producer/src/main/java/com/acme/example/kafka/Produce.java)
 to produce a message to the Kafka.
+
+```bash
+mvn compile exec:java -Dexec.mainClass="com.acme.example.kafka.Produce"
+```
+
+## Consume from Kafka.
+
+To consume messages using a Camel route, first install the azure identity 
maven project:
+```bash
+cd kafka-producer
+mvn clean install
+```
+then run:
+```bash
+camel run kafka-log.yaml 
+```
+
+You should see something like
+
+```bash
+2023-10-10 13:44:09.810  INFO 71770 --- [           main] 
el.impl.engine.AbstractCamelContext : Routes startup (started:2)
+2023-10-10 13:44:09.810  INFO 71770 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started kafka-to-log 
(kafka://my-topic)
+2023-10-10 13:44:09.810  INFO 71770 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started log-sink-1 (kamelet://source)
+2023-10-10 13:44:09.810  INFO 71770 --- [           main] 
el.impl.engine.AbstractCamelContext : Apache Camel 4.0.1 (kafka-log) started in 
187ms (build:0ms init:0ms start:187ms)
+2023-10-10 13:44:10.018  WARN 71770 --- [sumer[my-topic]] 
fka.clients.consumer.ConsumerConfig : These configurations 
'[apicurio.registry.avroDatumProvider, apicurio.registry.url]' were supplied 
but are not used yet.
+2023-10-10 13:44:10.019  INFO 71770 --- [sumer[my-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka version: 3.4.0
+2023-10-10 13:44:10.019  INFO 71770 --- [sumer[my-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka commitId: 2e1947d240607d53
+2023-10-10 13:44:10.019  INFO 71770 --- [sumer[my-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1696938250018
+2023-10-10 13:44:10.023  INFO 71770 --- [sumer[my-topic]] 
ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
+2023-10-10 13:44:10.023  INFO 71770 --- [sumer[my-topic]] 
l.component.kafka.KafkaFetchRecords : Subscribing my-topic-Thread 0 to topic 
my-topic
+2023-10-10 13:44:10.024  INFO 71770 --- [sumer[my-topic]] 
afka.clients.consumer.KafkaConsumer : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Subscribed to 
topic(s): my-topic
+2023-10-10 13:44:10.254  INFO 71770 --- [sumer[my-topic]] 
org.apache.kafka.clients.Metadata   : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Resetting the 
last seen epoch of partition my-topic-0 to 0 since the associated topicId 
changed from null to PP5gKKwZTTOwYYvKftvhgA
+2023-10-10 13:44:10.256  INFO 71770 --- [sumer[my-topic]] 
org.apache.kafka.clients.Metadata   : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Cluster ID: 
LGe3ByI8SLSis9Sm9zcCVg
+2023-10-10 13:44:10.257  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Discovered 
group coordinator ghost:9092 (id: 2147483647 rack: null)
+2023-10-10 13:44:10.263  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] (Re-)joining 
group
+2023-10-10 13:44:10.276  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Request 
joining group due to: need to re-join with the given member-id: 
consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033
+2023-10-10 13:44:10.278  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Request 
joining group due to: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException)
+2023-10-10 13:44:10.278  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] (Re-)joining 
group
+2023-10-10 13:44:10.283  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Successfully 
joined group with generation Generation{generationId=19, 
memberId='consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033', 
protocol='range'}
+2023-10-10 13:44:10.285  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Finished 
assignment for group at generation 19: 
{consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033=Assignment(partitions=[my-topic-0])}
+2023-10-10 13:44:10.292  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Successfully 
synced group in generation Generation{generationId=19, 
memberId='consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033', 
protocol='range'}
+2023-10-10 13:44:10.294  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Notifying 
assignor about the new Assignment(partitions=[my-topic-0])
+2023-10-10 13:44:10.298  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Adding newly 
assigned partitions: my-topic-0
+2023-10-10 13:44:10.314  INFO 71770 --- [sumer[my-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Setting 
offset for partition my-topic-0 to the committed offset 
FetchPosition{offset=11, offsetEpoch=Optional[0], 
currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], 
epoch=0}}
+```
+
+and after a message has been produced to Kafka you should see
+
+```bash
+2023-10-10 13:44:10.519  INFO 71770 --- [sumer[my-topic]] log-sink             
               : Exchange[
+  ExchangePattern: InOnly
+  Headers: {apicurio.value.encoding=[B@474baada, 
apicurio.value.globalId=[B@28e32105, CamelMessageTimestamp=1696938203819, 
kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = 
apicurio.value.globalId, value = [0, 0, 0, 0, 0, 0, 0, 3]), RecordHeader(key = 
apicurio.value.encoding, value = [66, 73, 78, 65, 82, 89])], isReadOnly = 
false), kafka.KEY=key, kafka.OFFSET=11, kafka.PARTITION=0, 
kafka.TIMESTAMP=1696938203819, kafka.TOPIC=my-topic}
+  BodyType: org.apache.avro.generic.GenericData.Record
+  Body: {"orderId": 1, "itemId": "item", "userId": "user", "quantity": 3.0, 
"description": "A really nice item I do love"}
+]
+```
+
+
diff --git a/jbang/kafka-apicurio-schema-registry/application.properties 
b/jbang/kafka-apicurio-schema-registry/application.properties
new file mode 100644
index 0000000..d491e63
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/application.properties
@@ -0,0 +1,2 @@
+bootstrap.servers=localhost:9092
+topic=my-topic
diff --git a/jbang/kafka-apicurio-schema-registry/kafka-log.yaml 
b/jbang/kafka-apicurio-schema-registry/kafka-log.yaml
new file mode 100644
index 0000000..9ca2187
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/kafka-log.yaml
@@ -0,0 +1,42 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# camel-k: dependency=mvn:com.acme.example:kafka-apicurio-producer:0.1
+# camel-k: 
dependency=mvn:io.apicurio:apicurio-registry-serdes-avro-serde:2.4.12.Final
+
+- beans:
+  - name: order
+    type: "#class:com.acme.example.kafka.models.Order"
+
+- route:
+    id: "kafka-to-log"
+    from:
+      uri: "kafka:{{topic}}"
+      parameters:
+        autoOffsetReset: earliest
+        brokers: "{{bootstrap.servers}}"
+        groupId: 'my-consumer-group'
+        valueDeserializer: 
'io.apicurio.registry.serde.avro.AvroKafkaDeserializer'
+        additionalProperties.apicurio.registry.url: 
'http://localhost:8080/apis/registry/v2'
+        additionalProperties.apicurio.registry.avro-datum-provider: 
'io.apicurio.registry.serde.avro.ReflectAvroDatumProvider'
+      steps:
+        - to:
+            uri: "kamelet:log-sink"
+            parameters:
+              showStreams: true
+              showHeaders: true
+              multiline: true
diff --git a/jbang/kafka-apicurio-schema-registry/kafka-producer/pom.xml 
b/jbang/kafka-apicurio-schema-registry/kafka-producer/pom.xml
new file mode 100644
index 0000000..fb16f3d
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/kafka-producer/pom.xml
@@ -0,0 +1,78 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>com.acme.example</groupId>
+  <artifactId>kafka-apicurio-producer</artifactId>
+  <packaging>jar</packaging>
+  <version>0.1</version>
+  <name>kafka-apicurio-prod</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+
+    <dependency>
+      <groupId>io.apicurio</groupId>
+      <artifactId>apicurio-registry-serdes-avro-serde</artifactId>
+      <version>2.4.12.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>3.4.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.11.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>2.0.7</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>2.0.7</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>3.8.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.11.0</version>
+        <configuration>
+          <source>17</source>
+          <target>17</target>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>1.11.3</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <sourceDirectory>src/main/avro</sourceDirectory>
+              
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
+              <stringType>String</stringType>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/avro/order.avsc 
b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/avro/order.avsc
new file mode 100644
index 0000000..e41753e
--- /dev/null
+++ 
b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/avro/order.avsc
@@ -0,0 +1,33 @@
+{
+  "doc": "Fact schema of an order",
+  "fields": [
+    {
+      "doc": "Unique id of the order.",
+      "name": "orderId",
+      "type": "int"
+    },
+    {
+      "doc": "Id of the ordered item.",
+      "name": "itemId",
+      "type": "string"
+    },
+    {
+      "doc": "Id of the user who ordered the item.",
+      "name": "userId",
+      "type": "string"
+    },
+    {
+      "doc": "How much was ordered.",
+      "name": "quantity",
+      "type": "double"
+    },
+    {
+      "doc": "Description of item.",
+      "name": "description",
+      "type": "string"
+    }
+  ],
+  "name": "Order",
+  "namespace": "com.acme.example.kafka.models",
+  "type": "record"
+}
diff --git 
a/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java
 
b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java
new file mode 100644
index 0000000..856c903
--- /dev/null
+++ 
b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.acme.example.kafka;
+
+import com.acme.example.kafka.models.Order;
+import io.apicurio.registry.serde.SerdeConfig;
+import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
+import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import io.apicurio.registry.serde.avro.ReflectAvroDatumProvider;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class Produce {
+
+    private static final String REGISTRY_URL = 
"http://localhost:8080/apis/registry/v2";;
+    public static final String DEFAULT_PROPERTIES_PATH = 
"../application.properties";
+
+    public static void main(String[] args) throws IOException {
+        String propertiesPath = DEFAULT_PROPERTIES_PATH;
+        if (args.length >= 1) {
+            propertiesPath = args[0];
+        }
+
+        Properties properties = new Properties();
+        properties.load(Files.newInputStream(Paths.get(propertiesPath)));
+
+        properties.put(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
+        properties.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
AvroKafkaSerializer.class);
+
+        properties.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, 
ReflectAvroDatumProvider.class.getName());
+
+        try (KafkaProducer<String, Order> orderProducer = new 
KafkaProducer<>(properties)) {
+            Order order = new Order(1, "item", "user", 3.0, "A really nice 
item I do love");
+            String topic = properties.getProperty("topic");
+            ProducerRecord<String, Order> record = new ProducerRecord<>(topic, 
"key", order);
+            RecordMetadata result = orderProducer.send(record).get(5, 
TimeUnit.SECONDS);
+            System.out.println("Sent record with offset " + result.offset());
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+}
diff --git 
a/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java
 
b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java
new file mode 100644
index 0000000..c559447
--- /dev/null
+++ 
b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java
@@ -0,0 +1,673 @@
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package com.acme.example.kafka.models;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.SchemaStore;
+
+/** Fact schema of an order */
+@org.apache.avro.specific.AvroGenerated
+public class Order extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+  private static final long serialVersionUID = -8676937297320921983L;
+
+
+  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.acme.example.kafka.models\",\"doc\":\"Fact
 schema of an 
order\",\"fields\":[{\"name\":\"orderId\",\"type\":\"int\",\"doc\":\"Unique id 
of the 
order.\"},{\"name\":\"itemId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Id
 of the ordered 
item.\"},{\"name\":\"userId\",\"type\":{\"type\":\"string\",\"avro.jav [...]
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+  private static final SpecificData MODEL$ = new SpecificData();
+
+  private static final BinaryMessageEncoder<Order> ENCODER =
+      new BinaryMessageEncoder<>(MODEL$, SCHEMA$);
+
+  private static final BinaryMessageDecoder<Order> DECODER =
+      new BinaryMessageDecoder<>(MODEL$, SCHEMA$);
+
+  /**
+   * Return the BinaryMessageEncoder instance used by this class.
+   * @return the message encoder used by this class
+   */
+  public static BinaryMessageEncoder<Order> getEncoder() {
+    return ENCODER;
+  }
+
+  /**
+   * Return the BinaryMessageDecoder instance used by this class.
+   * @return the message decoder used by this class
+   */
+  public static BinaryMessageDecoder<Order> getDecoder() {
+    return DECODER;
+  }
+
+  /**
+   * Create a new BinaryMessageDecoder instance for this class that uses the 
specified {@link SchemaStore}.
+   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+   * @return a BinaryMessageDecoder instance for this class backed by the 
given SchemaStore
+   */
+  public static BinaryMessageDecoder<Order> createDecoder(SchemaStore 
resolver) {
+    return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver);
+  }
+
+  /**
+   * Serializes this Order to a ByteBuffer.
+   * @return a buffer holding the serialized data for this instance
+   * @throws java.io.IOException if this instance could not be serialized
+   */
+  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+    return ENCODER.encode(this);
+  }
+
+  /**
+   * Deserializes a Order from a ByteBuffer.
+   * @param b a byte buffer holding serialized data for an instance of this 
class
+   * @return a Order instance decoded from the given buffer
+   * @throws java.io.IOException if the given bytes could not be deserialized 
into an instance of this class
+   */
+  public static Order fromByteBuffer(
+      java.nio.ByteBuffer b) throws java.io.IOException {
+    return DECODER.decode(b);
+  }
+
+  /** Unique id of the order. */
+  private int orderId;
+  /** Id of the ordered item. */
+  private java.lang.String itemId;
+  /** Id of the user who ordered the item. */
+  private java.lang.String userId;
+  /** How much was ordered. */
+  private double quantity;
+  /** Description of item. */
+  private java.lang.String description;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use <code>newBuilder()</code>.
+   */
+  public Order() {}
+
+  /**
+   * All-args constructor.
+   * @param orderId Unique id of the order.
+   * @param itemId Id of the ordered item.
+   * @param userId Id of the user who ordered the item.
+   * @param quantity How much was ordered.
+   * @param description Description of item.
+   */
+  public Order(java.lang.Integer orderId, java.lang.String itemId, 
java.lang.String userId, java.lang.Double quantity, java.lang.String 
description) {
+    this.orderId = orderId;
+    this.itemId = itemId;
+    this.userId = userId;
+    this.quantity = quantity;
+    this.description = description;
+  }
+
+  @Override
+  public org.apache.avro.specific.SpecificData getSpecificData() { return 
MODEL$; }
+
+  @Override
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+
+  // Used by DatumWriter.  Applications should not call.
+  @Override
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return orderId;
+    case 1: return itemId;
+    case 2: return userId;
+    case 3: return quantity;
+    case 4: return description;
+    default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
+    }
+  }
+
+  // Used by DatumReader.  Applications should not call.
+  @Override
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: orderId = (java.lang.Integer)value$; break;
+    case 1: itemId = value$ != null ? value$.toString() : null; break;
+    case 2: userId = value$ != null ? value$.toString() : null; break;
+    case 3: quantity = (java.lang.Double)value$; break;
+    case 4: description = value$ != null ? value$.toString() : null; break;
+    default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
+    }
+  }
+
+  /**
+   * Gets the value of the 'orderId' field.
+   * @return Unique id of the order.
+   */
+  public int getOrderId() {
+    return orderId;
+  }
+
+
+  /**
+   * Sets the value of the 'orderId' field.
+   * Unique id of the order.
+   * @param value the value to set.
+   */
+  public void setOrderId(int value) {
+    this.orderId = value;
+  }
+
+  /**
+   * Gets the value of the 'itemId' field.
+   * @return Id of the ordered item.
+   */
+  public java.lang.String getItemId() {
+    return itemId;
+  }
+
+
+  /**
+   * Sets the value of the 'itemId' field.
+   * Id of the ordered item.
+   * @param value the value to set.
+   */
+  public void setItemId(java.lang.String value) {
+    this.itemId = value;
+  }
+
+  /**
+   * Gets the value of the 'userId' field.
+   * @return Id of the user who ordered the item.
+   */
+  public java.lang.String getUserId() {
+    return userId;
+  }
+
+
+  /**
+   * Sets the value of the 'userId' field.
+   * Id of the user who ordered the item.
+   * @param value the value to set.
+   */
+  public void setUserId(java.lang.String value) {
+    this.userId = value;
+  }
+
+  /**
+   * Gets the value of the 'quantity' field.
+   * @return How much was ordered.
+   */
+  public double getQuantity() {
+    return quantity;
+  }
+
+
+  /**
+   * Sets the value of the 'quantity' field.
+   * How much was ordered.
+   * @param value the value to set.
+   */
+  public void setQuantity(double value) {
+    this.quantity = value;
+  }
+
+  /**
+   * Gets the value of the 'description' field.
+   * @return Description of item.
+   */
+  public java.lang.String getDescription() {
+    return description;
+  }
+
+
+  /**
+   * Sets the value of the 'description' field.
+   * Description of item.
+   * @param value the value to set.
+   */
+  public void setDescription(java.lang.String value) {
+    this.description = value;
+  }
+
+  /**
+   * Creates a new Order RecordBuilder.
+   * @return A new Order RecordBuilder
+   */
+  public static com.acme.example.kafka.models.Order.Builder newBuilder() {
+    return new com.acme.example.kafka.models.Order.Builder();
+  }
+
+  /**
+   * Creates a new Order RecordBuilder by copying an existing Builder.
+   * @param other The existing builder to copy.
+   * @return A new Order RecordBuilder
+   */
+  public static com.acme.example.kafka.models.Order.Builder 
newBuilder(com.acme.example.kafka.models.Order.Builder other) {
+    if (other == null) {
+      return new com.acme.example.kafka.models.Order.Builder();
+    } else {
+      return new com.acme.example.kafka.models.Order.Builder(other);
+    }
+  }
+
+  /**
+   * Creates a new Order RecordBuilder by copying an existing Order instance.
+   * @param other The existing instance to copy.
+   * @return A new Order RecordBuilder
+   */
+  public static com.acme.example.kafka.models.Order.Builder 
newBuilder(com.acme.example.kafka.models.Order other) {
+    if (other == null) {
+      return new com.acme.example.kafka.models.Order.Builder();
+    } else {
+      return new com.acme.example.kafka.models.Order.Builder(other);
+    }
+  }
+
+  /**
+   * RecordBuilder for Order instances.
+   */
+  @org.apache.avro.specific.AvroGenerated
+  public static class Builder extends 
org.apache.avro.specific.SpecificRecordBuilderBase<Order>
+    implements org.apache.avro.data.RecordBuilder<Order> {
+
+    /** Unique id of the order. */
+    private int orderId;
+    /** Id of the ordered item. */
+    private java.lang.String itemId;
+    /** Id of the user who ordered the item. */
+    private java.lang.String userId;
+    /** How much was ordered. */
+    private double quantity;
+    /** Description of item. */
+    private java.lang.String description;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(SCHEMA$, MODEL$);
+    }
+
+    /**
+     * Creates a Builder by copying an existing Builder.
+     * @param other The existing Builder to copy.
+     */
+    private Builder(com.acme.example.kafka.models.Order.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.orderId)) {
+        this.orderId = data().deepCopy(fields()[0].schema(), other.orderId);
+        fieldSetFlags()[0] = other.fieldSetFlags()[0];
+      }
+      if (isValidValue(fields()[1], other.itemId)) {
+        this.itemId = data().deepCopy(fields()[1].schema(), other.itemId);
+        fieldSetFlags()[1] = other.fieldSetFlags()[1];
+      }
+      if (isValidValue(fields()[2], other.userId)) {
+        this.userId = data().deepCopy(fields()[2].schema(), other.userId);
+        fieldSetFlags()[2] = other.fieldSetFlags()[2];
+      }
+      if (isValidValue(fields()[3], other.quantity)) {
+        this.quantity = data().deepCopy(fields()[3].schema(), other.quantity);
+        fieldSetFlags()[3] = other.fieldSetFlags()[3];
+      }
+      if (isValidValue(fields()[4], other.description)) {
+        this.description = data().deepCopy(fields()[4].schema(), 
other.description);
+        fieldSetFlags()[4] = other.fieldSetFlags()[4];
+      }
+    }
+
+    /**
+     * Creates a Builder by copying an existing Order instance
+     * @param other The existing instance to copy.
+     */
+    private Builder(com.acme.example.kafka.models.Order other) {
+      super(SCHEMA$, MODEL$);
+      if (isValidValue(fields()[0], other.orderId)) {
+        this.orderId = data().deepCopy(fields()[0].schema(), other.orderId);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.itemId)) {
+        this.itemId = data().deepCopy(fields()[1].schema(), other.itemId);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.userId)) {
+        this.userId = data().deepCopy(fields()[2].schema(), other.userId);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.quantity)) {
+        this.quantity = data().deepCopy(fields()[3].schema(), other.quantity);
+        fieldSetFlags()[3] = true;
+      }
+      if (isValidValue(fields()[4], other.description)) {
+        this.description = data().deepCopy(fields()[4].schema(), 
other.description);
+        fieldSetFlags()[4] = true;
+      }
+    }
+
+    /**
+      * Gets the value of the 'orderId' field.
+      * Unique id of the order.
+      * @return The value.
+      */
+    public int getOrderId() {
+      return orderId;
+    }
+
+
+    /**
+      * Sets the value of the 'orderId' field.
+      * Unique id of the order.
+      * @param value The value of 'orderId'.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder setOrderId(int value) {
+      validate(fields()[0], value);
+      this.orderId = value;
+      fieldSetFlags()[0] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'orderId' field has been set.
+      * Unique id of the order.
+      * @return True if the 'orderId' field has been set, false otherwise.
+      */
+    public boolean hasOrderId() {
+      return fieldSetFlags()[0];
+    }
+
+
+    /**
+      * Clears the value of the 'orderId' field.
+      * Unique id of the order.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder clearOrderId() {
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'itemId' field.
+      * Id of the ordered item.
+      * @return The value.
+      */
+    public java.lang.String getItemId() {
+      return itemId;
+    }
+
+
+    /**
+      * Sets the value of the 'itemId' field.
+      * Id of the ordered item.
+      * @param value The value of 'itemId'.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder 
setItemId(java.lang.String value) {
+      validate(fields()[1], value);
+      this.itemId = value;
+      fieldSetFlags()[1] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'itemId' field has been set.
+      * Id of the ordered item.
+      * @return True if the 'itemId' field has been set, false otherwise.
+      */
+    public boolean hasItemId() {
+      return fieldSetFlags()[1];
+    }
+
+
+    /**
+      * Clears the value of the 'itemId' field.
+      * Id of the ordered item.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder clearItemId() {
+      itemId = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'userId' field.
+      * Id of the user who ordered the item.
+      * @return The value.
+      */
+    public java.lang.String getUserId() {
+      return userId;
+    }
+
+
+    /**
+      * Sets the value of the 'userId' field.
+      * Id of the user who ordered the item.
+      * @param value The value of 'userId'.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder 
setUserId(java.lang.String value) {
+      validate(fields()[2], value);
+      this.userId = value;
+      fieldSetFlags()[2] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'userId' field has been set.
+      * Id of the user who ordered the item.
+      * @return True if the 'userId' field has been set, false otherwise.
+      */
+    public boolean hasUserId() {
+      return fieldSetFlags()[2];
+    }
+
+
+    /**
+      * Clears the value of the 'userId' field.
+      * Id of the user who ordered the item.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder clearUserId() {
+      userId = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'quantity' field.
+      * How much was ordered.
+      * @return The value.
+      */
+    public double getQuantity() {
+      return quantity;
+    }
+
+
+    /**
+      * Sets the value of the 'quantity' field.
+      * How much was ordered.
+      * @param value The value of 'quantity'.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder setQuantity(double 
value) {
+      validate(fields()[3], value);
+      this.quantity = value;
+      fieldSetFlags()[3] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'quantity' field has been set.
+      * How much was ordered.
+      * @return True if the 'quantity' field has been set, false otherwise.
+      */
+    public boolean hasQuantity() {
+      return fieldSetFlags()[3];
+    }
+
+
+    /**
+      * Clears the value of the 'quantity' field.
+      * How much was ordered.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder clearQuantity() {
+      fieldSetFlags()[3] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'description' field.
+      * Description of item.
+      * @return The value.
+      */
+    public java.lang.String getDescription() {
+      return description;
+    }
+
+
+    /**
+      * Sets the value of the 'description' field.
+      * Description of item.
+      * @param value The value of 'description'.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder 
setDescription(java.lang.String value) {
+      validate(fields()[4], value);
+      this.description = value;
+      fieldSetFlags()[4] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'description' field has been set.
+      * Description of item.
+      * @return True if the 'description' field has been set, false otherwise.
+      */
+    public boolean hasDescription() {
+      return fieldSetFlags()[4];
+    }
+
+
+    /**
+      * Clears the value of the 'description' field.
+      * Description of item.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder clearDescription() {
+      description = null;
+      fieldSetFlags()[4] = false;
+      return this;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Order build() {
+      try {
+        Order record = new Order();
+        record.orderId = fieldSetFlags()[0] ? this.orderId : 
(java.lang.Integer) defaultValue(fields()[0]);
+        record.itemId = fieldSetFlags()[1] ? this.itemId : (java.lang.String) 
defaultValue(fields()[1]);
+        record.userId = fieldSetFlags()[2] ? this.userId : (java.lang.String) 
defaultValue(fields()[2]);
+        record.quantity = fieldSetFlags()[3] ? this.quantity : 
(java.lang.Double) defaultValue(fields()[3]);
+        record.description = fieldSetFlags()[4] ? this.description : 
(java.lang.String) defaultValue(fields()[4]);
+        return record;
+      } catch (org.apache.avro.AvroMissingFieldException e) {
+        throw e;
+      } catch (java.lang.Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final org.apache.avro.io.DatumWriter<Order>
+    WRITER$ = 
(org.apache.avro.io.DatumWriter<Order>)MODEL$.createDatumWriter(SCHEMA$);
+
+  @Override public void writeExternal(java.io.ObjectOutput out)
+    throws java.io.IOException {
+    WRITER$.write(this, SpecificData.getEncoder(out));
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final org.apache.avro.io.DatumReader<Order>
+    READER$ = 
(org.apache.avro.io.DatumReader<Order>)MODEL$.createDatumReader(SCHEMA$);
+
+  @Override public void readExternal(java.io.ObjectInput in)
+    throws java.io.IOException {
+    READER$.read(this, SpecificData.getDecoder(in));
+  }
+
+  @Override protected boolean hasCustomCoders() { return true; }
+
+  @Override public void customEncode(org.apache.avro.io.Encoder out)
+    throws java.io.IOException
+  {
+    out.writeInt(this.orderId);
+
+    out.writeString(this.itemId);
+
+    out.writeString(this.userId);
+
+    out.writeDouble(this.quantity);
+
+    out.writeString(this.description);
+
+  }
+
+  @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in)
+    throws java.io.IOException
+  {
+    org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff();
+    if (fieldOrder == null) {
+      this.orderId = in.readInt();
+
+      this.itemId = in.readString();
+
+      this.userId = in.readString();
+
+      this.quantity = in.readDouble();
+
+      this.description = in.readString();
+
+    } else {
+      for (int i = 0; i < 5; i++) {
+        switch (fieldOrder[i].pos()) {
+        case 0:
+          this.orderId = in.readInt();
+          break;
+
+        case 1:
+          this.itemId = in.readString();
+          break;
+
+        case 2:
+          this.userId = in.readString();
+          break;
+
+        case 3:
+          this.quantity = in.readDouble();
+          break;
+
+        case 4:
+          this.description = in.readString();
+          break;
+
+        default:
+          throw new java.io.IOException("Corrupt ResolvingDecoder.");
+        }
+      }
+    }
+  }
+}
+
+
+
+
+
+
+
+
+
+


Reply via email to