This is an automated email from the ASF dual-hosted git repository. oalsafi pushed a commit to branch add-struct-check in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 68dbfb8c1b42aec6ecdce82b0cb4195b359d5ea2 Author: Omar Al-Safi <omars...@gmail.com> AuthorDate: Tue Feb 23 10:35:27 2021 +0100 Convert Struct to Map --- .../apache/camel/kafkaconnector/CamelSinkTask.java | 35 ++++++++-- .../camel/kafkaconnector/CamelSinkTaskTest.java | 77 ++++++++++++++++++---- 2 files changed, 97 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 82c16d2..1e8fa43 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -33,7 +34,9 @@ import org.apache.camel.support.DefaultExchange; import org.apache.camel.util.StringHelper; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; @@ -110,7 +113,7 @@ public class CamelSinkTask extends SinkTask { final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF); mapProperties = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF); mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF); - + CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { remoteUrl = TaskHelper.buildUrl(camelContext, @@ -175,8 +178,8 @@ public class CamelSinkTask extends SinkTask { TaskHelper.logRecordContent(LOG, loggingLevel, record); Exchange exchange = new DefaultExchange(producer.getCamelContext()); - exchange.getMessage().setBody(record.value()); - exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, record.key()); + exchange.getMessage().setBody(convertValueFromStruct(record.valueSchema(), record.value())); + exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, convertValueFromStruct(record.keySchema(), record.key())); for (Header header : record.headers()) { if (header.key().startsWith(HEADER_CAMEL_PREFIX)) { @@ -232,8 +235,32 @@ public class CamelSinkTask extends SinkTask { if (schema.type().equals(Schema.BYTES_SCHEMA.type()) && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) { destination.put(key, Decimal.toLogical(schema, (byte[]) header.value())); } else { - destination.put(key, header.value()); + destination.put(key, convertValueFromStruct(header.schema(), header.value())); + } + } + + private static Object convertValueFromStruct(Schema schema, Object value) { + // if we have a schema of type Struct, we convert it to map, otherwise + // we just return the value as it + if (schema != null && value != null && Schema.Type.STRUCT == schema.type()) { + return toMap((Struct) value); } + + return value; + } + + private static Map<String, Object> toMap(final Struct struct) { + final HashMap<String, Object> fieldsToValues = new HashMap<>(); + + struct.schema().fields().forEach(field -> { + try { + fieldsToValues.put(field.name(), struct.get(field)); + } catch (DataException e) { + fieldsToValues.put(field.name(), null); + } + }); + + return fieldsToValues; } CamelKafkaConnectMain getCms() { diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index bab0a5d..0316b2f 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.Test; @@ -74,6 +75,52 @@ public class CamelSinkTaskTest { } @Test + public void testStructBody() { + Map<String, String> props = new HashMap<>(); + props.put(TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + List<SinkRecord> records = new ArrayList<SinkRecord>(); + Schema keySchema = SchemaBuilder.struct() + .name("keySchema") + .field("id", Schema.INT32_SCHEMA) + .build(); + + Schema valueSchema = SchemaBuilder.struct() + .name("valueSchema") + .field("id", SchemaBuilder.INT32_SCHEMA) + .field("name", SchemaBuilder.STRING_SCHEMA) + .field("isAdult", SchemaBuilder.BOOLEAN_SCHEMA) + .build(); + + Struct key = new Struct(keySchema).put("id", 12); + Struct value = new Struct(valueSchema) + .put("id", 12) + .put("name", "jane doe") + .put("isAdult", true); + + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, keySchema, key, valueSchema, value, 42); + records.add(record); + sinkTask.put(records); + + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + + assertEquals("jane doe", exchange.getMessage().getBody(Map.class).get("name")); + assertEquals(12, exchange.getMessage().getBody(Map.class).get("id")); + assertTrue((Boolean) exchange.getMessage().getBody(Map.class).get("isAdult")); + + assertEquals(12, ((Map) exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)).get("id")); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + + sinkTask.stop(); + } + + @Test public void testTopicsRegex() { Map<String, String> props = new HashMap<>(); props.put("topics.regex", "topic1*"); @@ -120,6 +167,12 @@ public class CamelSinkTaskTest { BigDecimal myBigDecimal = new BigDecimal(1234567890); Schema schema = Decimal.schema(myBigDecimal.scale()); + Schema headerStruct = SchemaBuilder.struct() + .field("myHeader", Schema.STRING_SCHEMA) + .build(); + + Struct headerStructValue = new Struct(headerStruct).put("myHeader", "structHeader"); + List<SinkRecord> records = new ArrayList<SinkRecord>(); SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true); @@ -130,6 +183,7 @@ public class CamelSinkTaskTest { record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger); record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong); record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema); + record.headers().addStruct(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyStruct", headerStructValue); records.add(record); sinkTask.put(records); @@ -145,10 +199,11 @@ public class CamelSinkTaskTest { assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class)); + assertEquals("structHeader", exchange.getIn().getHeader("MyStruct", Map.class).get("myHeader")); sinkTask.stop(); } - + @Test public void testBodyAndHeadersExclusions() { Map<String, String> props = new HashMap<>(); @@ -196,7 +251,7 @@ public class CamelSinkTaskTest { sinkTask.stop(); } - + @Test public void testBodyAndHeadersExclusionsRegex() { Map<String, String> props = new HashMap<>(); @@ -671,7 +726,7 @@ public class CamelSinkTaskTest { sinkTask.stop(); } - + @Test public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() { Map<String, String> props = new HashMap<>(); @@ -730,7 +785,7 @@ public class CamelSinkTaskTest { sinkTask.stop(); } - + @Test public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() { Map<String, String> props = new HashMap<>(); @@ -879,7 +934,7 @@ public class CamelSinkTaskTest { sinkTask.stop(); } - + @Test public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException { Map<String, String> props = new HashMap<>(); @@ -933,7 +988,7 @@ public class CamelSinkTaskTest { assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); - + exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel5 camel6 camel7 camel8 camel9", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -942,7 +997,7 @@ public class CamelSinkTaskTest { sinkTask.stop(); } - + @Test public void testWithIdempotency() throws InterruptedException { Map<String, String> props = new HashMap<>(); @@ -993,13 +1048,13 @@ public class CamelSinkTaskTest { assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); - + exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel1", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); - + exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel2", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -1008,7 +1063,7 @@ public class CamelSinkTaskTest { sinkTask.stop(); } - + @Test public void testWithIdempotencyAndHeader() throws InterruptedException { Map<String, String> props = new HashMap<>(); @@ -1040,7 +1095,7 @@ public class CamelSinkTaskTest { assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); - + exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel1", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));