This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new af28832  [test](e2e) Add E2E Test Cases for Kafka Connect Transforms 
(#64)
af28832 is described below

commit af28832064f07c84fadd495bef8c2ee791dd7ca5
Author: Petrichor <1401597...@qq.com>
AuthorDate: Wed Feb 12 15:15:23 2025 +0800

    [test](e2e) Add E2E Test Cases for Kafka Connect Transforms (#64)
---
 .../e2e/sink/stringconverter/StringMsgE2ETest.java | 24 +++++++++++++++++++++
 .../e2e/transforms/rename_transforms.json          | 25 ++++++++++++++++++++++
 .../resources/e2e/transforms/rename_transforms.sql | 12 +++++++++++
 3 files changed, 61 insertions(+)

diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 294cb33..dba6d54 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -248,6 +248,30 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         checkResult(expected, query2, 3);
     }
 
+    @Test
+    public void testRenameTransform() throws Exception {
+        initialize("src/test/resources/e2e/transforms/rename_transforms.json");
+        String topic = "kf_rename_transform_msg";
+        String msg1 = "{\"id\":1,\"old_col1\":\"col1\",\"col2\":\"col2\"}";
+        String msg2 = "{\"id\":2,\"old_col1\":\"col1_1\",\"col2\":\"col2\"}";
+        produceMsg2Kafka(topic, msg1);
+        produceMsg2Kafka(topic, msg2);
+
+        String tableSql1 = 
loadContent("src/test/resources/e2e/transforms/rename_transforms.sql");
+        createTable(tableSql1);
+
+        Thread.sleep(2000);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        List<String> expectedResult = Arrays.asList("1,col1,col2", 
"2,col1_1,col2");
+        Thread.sleep(10000);
+        String query1 =
+                String.format(
+                        "select id,col1,col2 from %s.%s order by id",
+                        database, "rename_transform_msg");
+        checkResult(expectedResult, query1, 3);
+    }
+
     public void checkResult(List<String> expected, String query, int 
columnSize) throws Exception {
         List<String> actual = new ArrayList<>();
 
diff --git a/src/test/resources/e2e/transforms/rename_transforms.json 
b/src/test/resources/e2e/transforms/rename_transforms.json
new file mode 100644
index 0000000..c8c2138
--- /dev/null
+++ b/src/test/resources/e2e/transforms/rename_transforms.json
@@ -0,0 +1,25 @@
+{
+  "name":"rename_transforms_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"kf_rename_transform_msg",
+    "tasks.max":"1",
+    "doris.topic2table.map": "kf_rename_transform_msg:rename_transform_msg",
+    "buffer.count.records":"2",
+    "buffer.flush.time":"11",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"transforms_msg",
+    "load.model":"stream_load",
+    "transforms":"renameField",
+    
"transforms.renameField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
+    "transforms.renameField.renames":"old_col1:col1",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
+    "value.converter.schemas.enable": "false"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/transforms/rename_transforms.sql 
b/src/test/resources/e2e/transforms/rename_transforms.sql
new file mode 100644
index 0000000..4bb076e
--- /dev/null
+++ b/src/test/resources/e2e/transforms/rename_transforms.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE transforms_msg.rename_transform_msg (
+  id INT NULL,
+  col1 VARCHAR(20) NULL,
+  col2 varchar(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to