This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit a27015c14f5ec09a1b0214e7185d8f72e74cb5c8 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Mon Nov 16 23:38:23 2020 +0100 fix #692 : Cache already computed Schemas in PojoToSchemaAndStructTransform. --- .../transforms/PojoToSchemaAndStructTransform.java | 90 ++++++++++++++++------ .../PojoToSchemaAndStructTransformTest.java | 41 +++++++++- 2 files changed, 104 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java index d458636..1ecf48f 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java @@ -18,9 +18,13 @@ package org.apache.camel.kafkaconnector.transforms; import java.io.IOException; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.dataformat.avro.AvroFactory; import com.fasterxml.jackson.dataformat.avro.AvroSchema; import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator; @@ -44,39 +48,54 @@ public class PojoToSchemaAndStructTransform <R extends ConnectRecord<R>> impleme private static final ObjectMapper MAPPER = new ObjectMapper(new AvroFactory()); private AvroData avroData; + private ConcurrentMap<String, CacheEntry> avroSchemaWrapperCache; @Override public R apply(R r) { - LOG.debug("Incoming record: " + r); + LOG.debug("Incoming record: {}", r); - AvroSchemaGenerator gen = new AvroSchemaGenerator(); + if (r.value() != null) { + String recordClassCanonicalName = r.value().getClass().getCanonicalName(); + CacheEntry cacheEntry = avroSchemaWrapperCache.computeIfAbsent(recordClassCanonicalName, new Function<String, CacheEntry>() { + @Override + public CacheEntry apply(String s) { + //cache miss + AvroSchemaGenerator gen = new AvroSchemaGenerator(); - try { - MAPPER.acceptJsonFormatVisitor(r.value().getClass(), gen); - } catch (JsonMappingException e) { - throw new ConnectException("Error in generating POJO schema.", e); - } + try { + MAPPER.acceptJsonFormatVisitor(r.value().getClass(), gen); + } catch (JsonMappingException e) { + throw new ConnectException("Error in generating POJO schema.", e); + } - AvroSchema schemaWrapper = gen.getGeneratedSchema(); - org.apache.avro.Schema avroSchema = schemaWrapper.getAvroSchema(); - LOG.debug("Generated avro schema: " + avroSchema.toString(true)); + AvroSchema schemaWrapper = gen.getGeneratedSchema(); + LOG.debug("Generated and cached avro schema: {}", schemaWrapper.getAvroSchema().toString(true)); - SchemaAndValue connectSchemaAndData = null; - try { - byte[] avroDataByte = MAPPER.writer(schemaWrapper).writeValueAsBytes(r.value()); - Decoder decoder = DecoderFactory.get().binaryDecoder(avroDataByte, null); - DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(avroSchema); - GenericRecord genericAvroData = datumReader.read(null, decoder); + return new CacheEntry(schemaWrapper, MAPPER.writer(schemaWrapper)); + } + }); - connectSchemaAndData = this.avroData.toConnectData(avroSchema, genericAvroData); - } catch (IOException e) { - throw new ConnectException("Error in generating POJO Struct.", e); - } + SchemaAndValue connectSchemaAndData = null; + try { + byte[] avroDataByte = cacheEntry.getObjectWriter().writeValueAsBytes(r.value()); + Decoder decoder = DecoderFactory.get().binaryDecoder(avroDataByte, null); + org.apache.avro.Schema avroSchema = cacheEntry.getAvroSchemaWrapper().getAvroSchema(); + DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(avroSchema); + GenericRecord genericAvroData = datumReader.read(null, decoder); - LOG.debug("Generate kafka connect schema: " + connectSchemaAndData.schema()); - LOG.debug("Generate kafka connect value (as Struct): " + connectSchemaAndData.value()); - return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), - connectSchemaAndData.schema(), connectSchemaAndData.value(), r.timestamp()); + connectSchemaAndData = this.avroData.toConnectData(avroSchema, genericAvroData); + } catch (IOException e) { + throw new ConnectException("Error in generating POJO Struct.", e); + } + + LOG.debug("Generate kafka connect schema: {}", connectSchemaAndData.schema()); + LOG.debug("Generate kafka connect value (as Struct): {}", connectSchemaAndData.value()); + return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), + connectSchemaAndData.schema(), connectSchemaAndData.value(), r.timestamp()); + } else { + LOG.debug("Incoming record with a null value, nothing to be done."); + return r; + } } @Override @@ -91,6 +110,29 @@ public class PojoToSchemaAndStructTransform <R extends ConnectRecord<R>> impleme @Override public void configure(Map<String, ?> configs) { + this.avroSchemaWrapperCache = new ConcurrentHashMap<>(); this.avroData = new AvroData(new AvroDataConfig(configs)); } + + public Map<String, CacheEntry> getCache() { + return this.avroSchemaWrapperCache; + } + + public class CacheEntry { + private AvroSchema avroSchemaWrapper; + private ObjectWriter objectWriter; + + public CacheEntry(AvroSchema avroSchemaWrapper, ObjectWriter objectWriter) { + this.avroSchemaWrapper = avroSchemaWrapper; + this.objectWriter = objectWriter; + } + + public AvroSchema getAvroSchemaWrapper() { + return avroSchemaWrapper; + } + + public ObjectWriter getObjectWriter() { + return objectWriter; + } + } } \ No newline at end of file diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java index 9bc617c..e378eab 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java @@ -122,14 +122,49 @@ public class PojoToSchemaAndStructTransformTest { Schema.STRING_SCHEMA, "testKeyValue", Schema.BYTES_SCHEMA, map); - assertThrows(ConnectException.class, () -> {pojoToSchemaAndStructTransform.apply(cr);}); + assertThrows(ConnectException.class, () -> { + pojoToSchemaAndStructTransform.apply(cr); + }); + } + + @Test() + public void testNullValueConversion() { + PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform(); + pojoToSchemaAndStructTransform.configure(Collections.emptyMap()); + + ConnectRecord cr = new SourceRecord(null, null, "testTopic", + Schema.STRING_SCHEMA, "testKeyValue", + Schema.BYTES_SCHEMA, null); + + ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr); + assertEquals(cr, transformedCr); + } + + @Test() + public void testConversionCache() { + PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform(); + pojoToSchemaAndStructTransform.configure(Collections.emptyMap()); + + PojoWithMap pwm = new PojoWithMap(); + pwm.addToMap("ciao", 9); + + ConnectRecord cr = new SourceRecord(null, null, "testTopic", + Schema.STRING_SCHEMA, "testKeyValue", + Schema.BYTES_SCHEMA, pwm); + + assertEquals(0, pojoToSchemaAndStructTransform.getCache().keySet().size()); + pojoToSchemaAndStructTransform.apply(cr); + assertEquals(1, pojoToSchemaAndStructTransform.getCache().keySet().size()); + ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr); + assertEquals(1, pojoToSchemaAndStructTransform.getCache().keySet().size()); + assertTrue(pojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getCanonicalName())); } private void atLeastOneFieldWithGivenValueExists(List structs, String fieldName, String fieldExpectedValue) { structs.stream().filter( - struct -> ((Struct) struct).getString(fieldName) == null ? false : true + struct -> ((Struct) struct).getString(fieldName) == null ? false : true ).forEach( - struct -> assertEquals(fieldExpectedValue, ((Struct) struct).getString(fieldName)) + struct -> assertEquals(fieldExpectedValue, ((Struct) struct).getString(fieldName)) ); }