This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 2029cbe CAMEL-16882 Add ddl sql output in header (#5980) 2029cbe is described below commit 2029cbe05cbf02b3d0027320e7bc87884f4a80c4 Author: Winger <winger2...@gmail.com> AuthorDate: Tue Aug 24 22:58:23 2021 +0800 CAMEL-16882 Add ddl sql output in header (#5980) * CAMEL-16882 Add ddl sql output in header * CAMEL-16882 test class fmt * fix header key with DdlSQL --- .../component/debezium/DebeziumConstants.java | 1 + .../camel/component/debezium/DebeziumEndpoint.java | 4 +++- .../component/debezium/DebeziumEndpointTest.java | 25 ++++++++++++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java index 718866c..eefcebb 100644 --- a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java +++ b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java @@ -34,6 +34,7 @@ public final class DebeziumConstants { public static final String HEADER_OPERATION = HEADER_PREFIX + "Operation"; public static final String HEADER_TIMESTAMP = HEADER_PREFIX + "Timestamp"; public static final String HEADER_BEFORE = HEADER_PREFIX + "Before"; + public static final String HEADER_DDL_SQL = HEADER_PREFIX + "DdlSQL"; private DebeziumConstants() { } diff --git a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java index 19f1c4e..f9cfc4c 100644 --- a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java +++ b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import io.debezium.data.Envelope; +import io.debezium.relational.history.HistoryRecord; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -82,7 +83,7 @@ public abstract class DebeziumEndpoint<C extends EmbeddedDebeziumConfiguration> final Object before = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.BEFORE); final Object body = extractBodyValueFromValueStruct(valueSchema, value); final Object timestamp = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.TIMESTAMP); - + final Object ddl = extractValueFromValueStruct(valueSchema, value, HistoryRecord.Fields.DDL_STATEMENTS); // set message headers message.setHeader(DebeziumConstants.HEADER_IDENTIFIER, record.topic()); message.setHeader(DebeziumConstants.HEADER_KEY, record.key()); @@ -90,6 +91,7 @@ public abstract class DebeziumEndpoint<C extends EmbeddedDebeziumConfiguration> message.setHeader(DebeziumConstants.HEADER_OPERATION, operation); message.setHeader(DebeziumConstants.HEADER_BEFORE, before); message.setHeader(DebeziumConstants.HEADER_TIMESTAMP, timestamp); + message.setHeader(DebeziumConstants.HEADER_DDL_SQL, ddl); message.setHeader(Exchange.MESSAGE_TIMESTAMP, timestamp); message.setBody(body); diff --git a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java index 4c9c4ce..1a69941 100644 --- a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java +++ b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java @@ -199,6 +199,20 @@ public class DebeziumEndpointTest { assertNull(body); } + @Test + void testIfCreatesExchangeFromSourceDdlRecord() { + final SourceRecord sourceRecord = createDdlSQLRecord(); + + final Exchange exchange = debeziumEndpoint.createDbzExchange(null, sourceRecord); + final Message inMessage = exchange.getIn(); + + assertNotNull(exchange); + // assert headers + assertEquals("dummy", inMessage.getHeader(DebeziumConstants.HEADER_IDENTIFIER)); + assertEquals("SET character_set_server=utf8, collation_server=utf8_bin", + inMessage.getHeader(DebeziumConstants.HEADER_DDL_SQL)); + } + private SourceRecord createCreateRecord() { final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build(); final Schema sourceSchema = SchemaBuilder.struct().field("lsn", SchemaBuilder.int32()).build(); @@ -256,6 +270,17 @@ public class DebeziumEndpointTest { createKeyRecord(), envelope.schema(), payload); } + private SourceRecord createDdlSQLRecord() { + final Schema recordSchema = SchemaBuilder.struct().field("ddl", SchemaBuilder.string()).build(); + Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build()) + .build(); + final Struct recordValue = new Struct(recordSchema); + recordValue.put("ddl", "SET character_set_server=utf8, collation_server=utf8_bin"); + return new SourceRecord( + new HashMap<>(), createSourceOffset(), "dummy", null, + null, recordValue.schema(), recordValue); + } + private SourceRecord createUnknownUnnamedSchemaRecord() { final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build(); final Struct before = new Struct(recordSchema);