This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 2a1a50760146ef13a52ce0b622cedf39e04b7c34 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Fri Mar 5 22:15:53 2021 +0100 Properly handling UnitOfWork by compelting it at the last possible moment, properly fix #202 --- core/pom.xml | 19 ++++- .../kafkaconnector/CamelSourceConnectorConfig.java | 5 ++ .../camel/kafkaconnector/CamelSourceRecord.java | 43 ++++++++++ .../camel/kafkaconnector/CamelSourceTask.java | 96 +++++++++++++++++----- .../camel/kafkaconnector/CamelSourceTaskTest.java | 50 +++++++++++ .../CamelTypeConverterTransformTest.java | 25 ++++++ parent/pom.xml | 8 +- 7 files changed, 219 insertions(+), 27 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 2e32d16..f59f70d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -49,6 +49,10 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-seda</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-kafka</artifactId> </dependency> <dependency> @@ -56,6 +60,13 @@ <artifactId>camel-core-languages</artifactId> </dependency> + <!-- Tools --> + <dependency> + <groupId>org.jctools</groupId> + <artifactId>jctools-core</artifactId> + <version>${version.jctools}</version> + </dependency> + <!-- Kafka --> <dependency> <groupId>org.apache.kafka</groupId> @@ -108,22 +119,22 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-seda</artifactId> + <artifactId>camel-timer</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-timer</artifactId> + <artifactId>camel-log</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-log</artifactId> + <artifactId>camel-slack</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-slack</artifactId> + <artifactId>camel-netty-http</artifactId> <scope>test</scope> </dependency> diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index bb4f8f8..4acfa62 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -54,6 +54,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig { public static final String CAMEL_SOURCE_MAX_POLL_DURATION_CONF = "camel.source.maxPollDuration"; public static final String CAMEL_SOURCE_MAX_POLL_DURATION_DOC = "The maximum time in milliseconds spent in a single call to poll()"; + public static final Integer CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DEFAULT = 1024; + public static final String CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF = "camel.source.maxNotCommittedRecords"; + public static final String CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DOC = "The maximum number of non committed kafka connect records that can be tolerated before stop polling new records (rounded to the next power of 2) with a minimum of 4."; + public static final Long CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT = 1000L; public static final String CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF = "camel.source.pollingConsumerQueueSize"; public static final String CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC = "The queue size for the internal hand-off queue between the polling consumer, and producers sending data into the queue."; @@ -82,6 +86,7 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig { .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC) .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC) .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC) + .define(CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF, Type.INT, CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_DOC) .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC) .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC) .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java new file mode 100644 index 0000000..87934ef --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceRecord.java @@ -0,0 +1,43 @@ +package org.apache.camel.kafkaconnector; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.Map; + +public class CamelSourceRecord extends SourceRecord { + private Integer claimCheck = null; + + public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema valueSchema, Object value) { + super(sourcePartition, sourceOffset, topic, partition, valueSchema, value); + } + + public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Schema valueSchema, Object value) { + super(sourcePartition, sourceOffset, topic, valueSchema, value); + } + + public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Schema keySchema, Object key, Schema valueSchema, Object value) { + super(sourcePartition, sourceOffset, topic, keySchema, key, valueSchema, value); + } + + public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value) { + super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value); + } + + public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) { + super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp); + } + + public CamelSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { + super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp, headers); + } + + public Integer getClaimCheck() { + return claimCheck; + } + + public void setClaimCheck(Integer claimCheck) { + this.claimCheck = claimCheck; + } +} diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 16e6bfc..03d0c1a 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -16,30 +16,37 @@ */ package org.apache.camel.kafkaconnector; -import java.math.BigDecimal; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; - import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.LoggingLevel; import org.apache.camel.PollingConsumer; +import org.apache.camel.StreamCache; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain; import org.apache.camel.kafkaconnector.utils.SchemaHelper; import org.apache.camel.kafkaconnector.utils.TaskHelper; +import org.apache.camel.support.UnitOfWorkHelper; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.math.BigDecimal; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; + public class CamelSourceTask extends SourceTask { public static final String HEADER_CAMEL_PREFIX = "CamelHeader."; public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty."; @@ -49,18 +56,23 @@ public class CamelSourceTask extends SourceTask { private static final String CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX = "camel.source.endpoint."; private static final String CAMEL_SOURCE_PATH_PROPERTIES_PREFIX = "camel.source.path."; - private static final String LOCAL_URL = "direct:end"; + private static final String LOCAL_URL = "seda:end"; private CamelKafkaConnectMain cms; private PollingConsumer consumer; private String[] topics; private Long maxBatchPollSize; private Long maxPollDuration; + private Integer maxNotCommittedRecords; private String camelMessageHeaderKey; private LoggingLevel loggingLevel = LoggingLevel.OFF; + private Exchange[] exchangesWaitingForAck; + //the assumption is that at most 1 thread is running poll() method and at most 1 thread is running commitRecord() + private SpscArrayQueue<Integer> freeSlots; private boolean mapProperties; private boolean mapHeaders; + @Override public String version() { return VersionUtil.getVersion(); @@ -82,6 +94,7 @@ public class CamelSourceTask extends SourceTask { maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF); maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF); + maxNotCommittedRecords = config.getInt(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF); camelMessageHeaderKey = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF); @@ -105,10 +118,24 @@ public class CamelSourceTask extends SourceTask { final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF); mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF); mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF); - + topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); - String localUrl = getLocalUrlWithPollingOptions(config); + long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF); + long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF); + boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF); + String localUrl = getLocalUrlWithPollingOptions(pollingConsumerQueueSize, pollingConsumerBlockTimeout, pollingConsumerBlockWhenFull); + + freeSlots = new SpscArrayQueue<>(maxNotCommittedRecords); + freeSlots.fill(new MessagePassingQueue.Supplier<Integer>() { + int i = 0; + @Override + public Integer get() { + return i++; + } + }); + //needs to be done like this because freeSlots capacity is rounded to the next power of 2 of maxNotCommittedRecords + exchangesWaitingForAck = new Exchange[freeSlots.capacity()]; CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { @@ -155,13 +182,14 @@ public class CamelSourceTask extends SourceTask { @Override public synchronized List<SourceRecord> poll() { + LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - freeSlots.size()); final long startPollEpochMilli = Instant.now().toEpochMilli(); long remaining = remaining(startPollEpochMilli, maxPollDuration); long collectedRecords = 0L; List<SourceRecord> records = new ArrayList<>(); - while (collectedRecords < maxBatchPollSize && remaining > 0) { + while (collectedRecords < maxBatchPollSize && freeSlots.size() >= topics.length && remaining > 0) { Exchange exchange = consumer.receive(remaining); if (exchange == null) { // Nothing received, abort and return what we received so far @@ -177,31 +205,46 @@ public class CamelSourceTask extends SourceTask { Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId()); final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null; - final Object messageBodyValue = exchange.getMessage().getBody(); + Object messageBodyValue = exchange.getMessage().getBody(); final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null; final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null; final long timestamp = calculateTimestamp(exchange); + // take in account Cached camel streams + if (messageBodyValue instanceof StreamCache) { + StreamCache sc = (StreamCache) messageBodyValue; + // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs) + sc.reset(); + try { + messageBodyValue = sc.copy(exchange); + } catch (IOException e) { + e.printStackTrace(); + } + } for (String singleTopic : topics) { - SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema, + CamelSourceRecord camelRecord = new CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue, timestamp); if (mapHeaders) { if (exchange.getMessage().hasHeaders()) { - setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX); + setAdditionalHeaders(camelRecord, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX); } } if (mapProperties) { if (exchange.hasProperties()) { - setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX); + setAdditionalHeaders(camelRecord, exchange.getProperties(), PROPERTY_CAMEL_PREFIX); } } - TaskHelper.logRecordContent(LOG, loggingLevel, record); - records.add(record); + TaskHelper.logRecordContent(LOG, loggingLevel, camelRecord); + Integer claimCheck = freeSlots.remove(); + camelRecord.setClaimCheck(claimCheck); + exchangesWaitingForAck[claimCheck] = exchange; + LOG.debug("Record: {}, containing data from exchange: {}, is associated with claim check number: {}", camelRecord, exchange, claimCheck); + records.add(camelRecord); } collectedRecords++; remaining = remaining(startPollEpochMilli, maxPollDuration); @@ -211,6 +254,18 @@ public class CamelSourceTask extends SourceTask { } @Override + public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException { + ///XXX: this should be a safe cast please see: https://issues.apache.org/jira/browse/KAFKA-12391 + Integer claimCheck = ((CamelSourceRecord)record).getClaimCheck(); + LOG.debug("Committing record with claim check number: {}", claimCheck); + Exchange correlatedExchange = exchangesWaitingForAck[claimCheck]; + exchangesWaitingForAck[claimCheck] = null; + freeSlots.add(claimCheck); + UnitOfWorkHelper.doneSynchronizations(correlatedExchange, correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG); + LOG.debug("Record with claim check number: {} committed.", claimCheck); + } + + @Override public void stop() { LOG.info("Stopping CamelSourceTask connector task"); try { @@ -301,10 +356,7 @@ public class CamelSourceTask extends SourceTask { } } - private String getLocalUrlWithPollingOptions(CamelSourceConnectorConfig config) { - long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF); - long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF); - boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF); + private String getLocalUrlWithPollingOptions(long pollingConsumerQueueSize, long pollingConsumerBlockTimeout, boolean pollingConsumerBlockWhenFull) { return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout=" + pollingConsumerBlockTimeout + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull; } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 21d56fc..51b4db3 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -16,15 +16,19 @@ */ package org.apache.camel.kafkaconnector; +import java.awt.print.PrinterJob; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; import org.apache.camel.ProducerTemplate; import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator; @@ -77,6 +81,24 @@ public class CamelSourceTaskTest { } @Test + public void testSourcePollingMaxNotCommittedRecords() { + final long size = 4; + Map<String, String> props = new HashMap<>(); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF, String.valueOf(size)); + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); + + sendBatchOfRecords(sourceTask, size + 1); + List<SourceRecord> poll = sourceTask.poll(); + + assertEquals(4, poll.size()); + sourceTask.stop(); + } + + @Test public void testSourcePollingMaxBatchPollSize() { final long size = 2; Map<String, String> props = new HashMap<>(); @@ -621,4 +643,32 @@ public class CamelSourceTaskTest { sourceTask.stop(); } + + @Test + public void testRequestReply() throws InterruptedException { + Map<String, String> props = new HashMap<>(); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute(new Runnable() { + @Override + public void run() { + final ProducerTemplate template = sourceTask.getCms().getProducerTemplate(); + String result = template.requestBody(DIRECT_URI, "test", String.class); + assertEquals("test", result); + } + }); + + List<SourceRecord> poll = sourceTask.poll(); + assertEquals(1, poll.size()); + + sourceTask.commitRecord(poll.get(0), null); + + sourceTask.stop(); + executor.shutdown(); + } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java index 92c668b..c6cecbf 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java @@ -16,10 +16,14 @@ */ package org.apache.camel.kafkaconnector.transforms; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import io.netty.buffer.Unpooled; +import org.apache.camel.component.netty.http.NettyChannelBufferStreamCache; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -63,6 +67,27 @@ public class CamelTypeConverterTransformTest { } @Test + public void testIfItConvertsNettyCorrectly() { + final String testMessage = "testMessage"; + NettyChannelBufferStreamCache nettyTestValue = new NettyChannelBufferStreamCache(Unpooled.wrappedBuffer(testMessage.getBytes(Charset.defaultCharset()))); + + final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "1234", Schema.BYTES_SCHEMA, nettyTestValue); + + final Map<String, Object> propsForValueSmt = new HashMap<>(); + propsForValueSmt.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, "java.lang.String"); + + final Transformation<SourceRecord> transformationValue = new CamelTypeConverterTransform.Value<>(); + + transformationValue.configure(propsForValueSmt); + + final SourceRecord transformedValueSourceRecord = transformationValue.apply(connectRecord); + + assertEquals(java.lang.String.class, transformedValueSourceRecord.value().getClass()); + assertEquals(Schema.STRING_SCHEMA, transformedValueSourceRecord.valueSchema()); + assertEquals(testMessage, transformedValueSourceRecord.value()); + } + + @Test public void testIfHandlesTypeConvertersFromCamelComponents() { // we know we have a type converter from struct to map in dbz component, so we use this for testing final Schema schema = SchemaBuilder.struct() diff --git a/parent/pom.xml b/parent/pom.xml index 85a4fa7..4230a7f 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -35,6 +35,7 @@ <version.guava>20.0</version.guava> <version.javax.annotation-api>1.3.2</version.javax.annotation-api> <version.postgres>42.2.14</version.postgres> + <version.jctools>3.3.0</version.jctools> <version.maven.compiler>3.8.1</version.maven.compiler> <version.maven.javadoc>3.1.1</version.maven.javadoc> @@ -57,7 +58,6 @@ <!-- Note: we are deliberately overriding this one due to GH issue #990 --> <testcontainers-version>1.15.2</testcontainers-version> - <mycila-license-version>3.0</mycila-license-version> <gmavenplus-plugin-version>1.9.0</gmavenplus-plugin-version> <groovy-version>3.0.7</groovy-version> @@ -116,6 +116,12 @@ <version>${version.guava}</version> </dependency> + <dependency> + <groupId>org.jctools</groupId> + <artifactId>jctools-core</artifactId> + <version>${version.jctools}</version> + </dependency> + <!-- Kafka dependencies --> <dependency> <groupId>org.apache.kafka</groupId>