This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c5b8396  [Improve] check topic mutating SMTs (#68)
c5b8396 is described below

commit c5b83966103b7eb04799d524118f787ca122f6f5
Author: wangchuang <59386838+chuang-wang-...@users.noreply.github.com>
AuthorDate: Fri Apr 25 16:02:03 2025 +0800

    [Improve] check topic mutating SMTs (#68)
---
 .../doris/kafka/connector/DorisSinkTask.java       |  2 +-
 .../connector/service/DorisDefaultSinkService.java | 20 ++++++++++-
 .../connector/service/DorisSinkServiceFactory.java |  6 ++--
 .../connector/e2e/kafka/KafkaContainerService.java |  2 ++
 .../e2e/kafka/KafkaContainerServiceImpl.java       | 33 ++++++++++++++++++
 .../e2e/sink/stringconverter/StringMsgE2ETest.java | 25 ++++++++++++++
 .../connector/service/TestDorisSinkService.java    | 40 +++++++++++++++++++++-
 .../e2e/transforms/regex_router_transforms.json    | 26 ++++++++++++++
 .../e2e/transforms/regex_router_transforms.sql     | 12 +++++++
 9 files changed, 161 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java 
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index e83f033..576de26 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -55,7 +55,7 @@ public class DorisSinkTask extends SinkTask {
         LOG.info("kafka doris sink task start with {}", parsedConfig);
         this.options = new DorisOptions(parsedConfig);
         this.remainingRetries = options.getMaxRetries();
-        this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig);
+        this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig, 
context);
     }
 
     /**
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 36f78fe..3a220f1 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -43,7 +43,9 @@ import org.apache.doris.kafka.connector.writer.load.LoadModel;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,9 +67,11 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
     private final MetricsJmxReporter metricsJmxReporter;
     private final DorisConnectMonitor connectMonitor;
     private final ObjectMapper objectMapper;
+    private final SinkTaskContext context;
 
-    DorisDefaultSinkService(Map<String, String> config) {
+    DorisDefaultSinkService(Map<String, String> config, SinkTaskContext 
context) {
         this.dorisOptions = new DorisOptions(config);
+        this.context = context;
         this.objectMapper = new ObjectMapper();
         this.writer = new HashMap<>();
         this.conn = new JdbcConnectionProvider(dorisOptions);
@@ -131,6 +135,8 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
                         record.kafkaOffset());
                 continue;
             }
+            // check topic mutating SMTs
+            checkTopicMutating(record);
             // Might happen a count of record based flushing,buffer
             insert(record);
         }
@@ -194,6 +200,18 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
                         });
     }
 
+    /** Check if the topic of record is mutated */
+    public void checkTopicMutating(SinkRecord record) {
+        TopicPartition tp = new TopicPartition(record.topic(), 
record.kafkaPartition());
+        if (!context.assignment().contains(tp)) {
+            throw new ConnectException(
+                    "Unexpected topic name: ["
+                            + record.topic()
+                            + "] that doesn't match assigned partitions. "
+                            + "Connector doesn't support topic mutating SMTs. 
");
+        }
+    }
+
     /**
      * Get the table name in doris for the given record.
      *
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
index 05ae10d..dbf7192 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
@@ -20,11 +20,13 @@
 package org.apache.doris.kafka.connector.service;
 
 import java.util.Map;
+import org.apache.kafka.connect.sink.SinkTaskContext;
 
 /** A factory to create {@link DorisSinkService} */
 public class DorisSinkServiceFactory {
 
-    public static DorisSinkService getDorisSinkService(Map<String, String> 
connectorConfig) {
-        return new DorisDefaultSinkService(connectorConfig);
+    public static DorisSinkService getDorisSinkService(
+            Map<String, String> connectorConfig, SinkTaskContext context) {
+        return new DorisDefaultSinkService(connectorConfig, context);
     }
 }
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
index 2e9dc8d..786d536 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
@@ -37,5 +37,7 @@ public interface KafkaContainerService {
 
     void deleteKafkaConnector(String name);
 
+    String getConnectorTaskStatus(String name);
+
     void close();
 }
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
index ec39f49..cf1c164 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
@@ -19,6 +19,8 @@
 
 package org.apache.doris.kafka.connector.e2e.kafka;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -35,10 +37,12 @@ import 
org.apache.doris.kafka.connector.exception.DorisException;
 import org.apache.http.StatusLine;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.apache.kafka.connect.cli.ConnectDistributed;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +74,7 @@ public class KafkaContainerServiceImpl implements 
KafkaContainerService {
     private static final int MAX_RETRIES = 5;
     private GenericContainer schemaRegistryContainer;
     private static Network network = Network.SHARED;
+    private static final ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public String getInstanceHostAndPort() {
@@ -284,4 +289,32 @@ public class KafkaContainerServiceImpl implements 
KafkaContainerService {
         }
         LOG.info("{} Kafka connector deleted successfully.", name);
     }
+
+    @Override
+    public String getConnectorTaskStatus(String name) {
+        String connectUrl = "http://"; + kafkaServerHost + ":" + CONNECT_PORT + 
"/connectors/";
+        String getStatusUrl = connectUrl + name + "/status";
+        HttpGet httpGet = new HttpGet(getStatusUrl);
+        try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
+            StatusLine statusLine = response.getStatusLine();
+            if (statusLine.getStatusCode() != 200) {
+                LOG.warn(
+                        "Failed to get connector status, name={}, msg={}",
+                        name,
+                        statusLine.getReasonPhrase());
+            }
+            JsonNode root = 
objectMapper.readTree(EntityUtils.toString(response.getEntity()));
+            JsonNode tasks = root.get("tasks");
+            // tasks is an array, and only care about the first task
+            if (tasks != null && tasks.isArray() && tasks.size() > 0) {
+                JsonNode task = tasks.get(0);
+                return task.get("state").asText(); // RUNNING / FAILED / 
UNASSIGNED
+            } else {
+                LOG.warn("No task info found for connector: " + name);
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to get kafka connect task status, name={}", name);
+        }
+        return null;
+    }
 }
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 8c4f207..4430dad 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -306,6 +306,31 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         checkResult(expectedResult, query1, 3);
     }
 
+    @Test
+    public void testTopicMutatingSmt() throws Exception {
+        
initialize("src/test/resources/e2e/transforms/regex_router_transforms.json");
+        String topic = "p-regex_router_transform_msg";
+        String msg1 = "{\"id\":1,\"col1\":\"col1\",\"col2\":\"col2\"}";
+        produceMsg2Kafka(topic, msg1);
+
+        String tableSql1 =
+                
loadContent("src/test/resources/e2e/transforms/regex_router_transforms.sql");
+        createTable(tableSql1);
+
+        Thread.sleep(2000);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        List<String> expectedResult = Collections.emptyList();
+        Thread.sleep(10000);
+        String query1 =
+                String.format(
+                        "select id,col1,col2 from %s.%s order by id",
+                        database, "regex_router_transform_msg");
+
+        Assert.assertEquals("FAILED", 
kafkaContainerService.getConnectorTaskStatus(connectorName));
+        checkResult(expectedResult, query1, 3);
+    }
+
     @AfterClass
     public static void closeInstance() {
         kafkaContainerService.deleteKafkaConnector(connectorName);
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
 
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
index 7ea1c2a..dc2e6ae 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
@@ -19,6 +19,10 @@
 
 package org.apache.doris.kafka.connector.service;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
@@ -26,12 +30,15 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,7 +60,10 @@ public class TestDorisSinkService {
         props.put("task_id", "1");
         props.put("name", "sink-connector-test");
         props.put("record.tablename.field", "table_name");
-        dorisDefaultSinkService = new DorisDefaultSinkService((Map) props);
+        SinkTaskContext context = mock(SinkTaskContext.class);
+        TopicPartition assignedTp = new TopicPartition("expected-topic", 0);
+        when(context.assignment()).thenReturn(Sets.newHashSet(assignedTp));
+        dorisDefaultSinkService = new DorisDefaultSinkService((Map) props, 
context);
         jsonConverter.configure(new HashMap<>(), false);
     }
 
@@ -132,4 +142,32 @@ public class TestDorisSinkService {
         Assert.assertEquals(
                 "test_kafka_tbl", 
dorisDefaultSinkService.getSinkDorisTableName(record5));
     }
+
+    @Test
+    public void testCheckTopicMutating() {
+        SinkRecord record1 =
+                new SinkRecord(
+                        "expected-topic",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "val",
+                        1);
+        SinkRecord record2 =
+                new SinkRecord(
+                        "mutated-topic",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "val",
+                        1);
+        dorisDefaultSinkService.checkTopicMutating(record1);
+
+        Assert.assertThrows(
+                "Unexpected topic name: [mutated_topic] that doesn't match 
assigned partitions. Connector doesn't support topic mutating SMTs.",
+                ConnectException.class,
+                () -> dorisDefaultSinkService.checkTopicMutating(record2));
+    }
 }
diff --git a/src/test/resources/e2e/transforms/regex_router_transforms.json 
b/src/test/resources/e2e/transforms/regex_router_transforms.json
new file mode 100644
index 0000000..d8aff6d
--- /dev/null
+++ b/src/test/resources/e2e/transforms/regex_router_transforms.json
@@ -0,0 +1,26 @@
+{
+  "name":"regex_router_transforms_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"p-regex_router_transform_msg",
+    "tasks.max":"1",
+    "doris.topic2table.map": 
"p-regex_router_transform_msg:regex_router_transform_msg,regex_router_transform_msg:regex_router_transform_msg",
+    "buffer.count.records":"2",
+    "buffer.flush.time":"11",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"transforms_msg",
+    "load.model":"stream_load",
+    "transforms": "dropPrefix",
+    "transforms.dropPrefix.type": 
"org.apache.kafka.connect.transforms.RegexRouter",
+    "transforms.dropPrefix.regex": "p-(.*)",
+    "transforms.dropPrefix.replacement": "$1",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
+    "value.converter.schemas.enable": "false"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/transforms/regex_router_transforms.sql 
b/src/test/resources/e2e/transforms/regex_router_transforms.sql
new file mode 100644
index 0000000..e72edcd
--- /dev/null
+++ b/src/test/resources/e2e/transforms/regex_router_transforms.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE transforms_msg.regex_router_transform_msg (
+  id INT NULL,
+  col1 VARCHAR(20) NULL,
+  col2 varchar(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to