This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new af28832 [test](e2e) Add E2E Test Cases for Kafka Connect Transforms (#64) af28832 is described below commit af28832064f07c84fadd495bef8c2ee791dd7ca5 Author: Petrichor <1401597...@qq.com> AuthorDate: Wed Feb 12 15:15:23 2025 +0800 [test](e2e) Add E2E Test Cases for Kafka Connect Transforms (#64) --- .../e2e/sink/stringconverter/StringMsgE2ETest.java | 24 +++++++++++++++++++++ .../e2e/transforms/rename_transforms.json | 25 ++++++++++++++++++++++ .../resources/e2e/transforms/rename_transforms.sql | 12 +++++++++++ 3 files changed, 61 insertions(+) diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java index 294cb33..dba6d54 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java @@ -248,6 +248,30 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { checkResult(expected, query2, 3); } + @Test + public void testRenameTransform() throws Exception { + initialize("src/test/resources/e2e/transforms/rename_transforms.json"); + String topic = "kf_rename_transform_msg"; + String msg1 = "{\"id\":1,\"old_col1\":\"col1\",\"col2\":\"col2\"}"; + String msg2 = "{\"id\":2,\"old_col1\":\"col1_1\",\"col2\":\"col2\"}"; + produceMsg2Kafka(topic, msg1); + produceMsg2Kafka(topic, msg2); + + String tableSql1 = loadContent("src/test/resources/e2e/transforms/rename_transforms.sql"); + createTable(tableSql1); + + Thread.sleep(2000); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + List<String> expectedResult = Arrays.asList("1,col1,col2", "2,col1_1,col2"); + Thread.sleep(10000); + String query1 = + String.format( + "select id,col1,col2 from %s.%s order by id", + database, "rename_transform_msg"); + checkResult(expectedResult, query1, 3); + } + public void checkResult(List<String> expected, String query, int columnSize) throws Exception { List<String> actual = new ArrayList<>(); diff --git a/src/test/resources/e2e/transforms/rename_transforms.json b/src/test/resources/e2e/transforms/rename_transforms.json new file mode 100644 index 0000000..c8c2138 --- /dev/null +++ b/src/test/resources/e2e/transforms/rename_transforms.json @@ -0,0 +1,25 @@ +{ + "name":"rename_transforms_connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"kf_rename_transform_msg", + "tasks.max":"1", + "doris.topic2table.map": "kf_rename_transform_msg:rename_transform_msg", + "buffer.count.records":"2", + "buffer.flush.time":"11", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"transforms_msg", + "load.model":"stream_load", + "transforms":"renameField", + "transforms.renameField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value", + "transforms.renameField.renames":"old_col1:col1", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/transforms/rename_transforms.sql b/src/test/resources/e2e/transforms/rename_transforms.sql new file mode 100644 index 0000000..4bb076e --- /dev/null +++ b/src/test/resources/e2e/transforms/rename_transforms.sql @@ -0,0 +1,12 @@ +-- Please note that the database here should be consistent with doris.database in the file where the connector is registered. +CREATE TABLE transforms_msg.rename_transform_msg ( + id INT NULL, + col1 VARCHAR(20) NULL, + col2 varchar(20) NULL +) ENGINE=OLAP +UNIQUE KEY(`id`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org