This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new c5b8396 [Improve] check topic mutating SMTs (#68) c5b8396 is described below commit c5b83966103b7eb04799d524118f787ca122f6f5 Author: wangchuang <59386838+chuang-wang-...@users.noreply.github.com> AuthorDate: Fri Apr 25 16:02:03 2025 +0800 [Improve] check topic mutating SMTs (#68) --- .../doris/kafka/connector/DorisSinkTask.java | 2 +- .../connector/service/DorisDefaultSinkService.java | 20 ++++++++++- .../connector/service/DorisSinkServiceFactory.java | 6 ++-- .../connector/e2e/kafka/KafkaContainerService.java | 2 ++ .../e2e/kafka/KafkaContainerServiceImpl.java | 33 ++++++++++++++++++ .../e2e/sink/stringconverter/StringMsgE2ETest.java | 25 ++++++++++++++ .../connector/service/TestDorisSinkService.java | 40 +++++++++++++++++++++- .../e2e/transforms/regex_router_transforms.json | 26 ++++++++++++++ .../e2e/transforms/regex_router_transforms.sql | 12 +++++++ 9 files changed, 161 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java index e83f033..576de26 100644 --- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java +++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java @@ -55,7 +55,7 @@ public class DorisSinkTask extends SinkTask { LOG.info("kafka doris sink task start with {}", parsedConfig); this.options = new DorisOptions(parsedConfig); this.remainingRetries = options.getMaxRetries(); - this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig); + this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig, context); } /** diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java index 36f78fe..3a220f1 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java @@ -43,7 +43,9 @@ import org.apache.doris.kafka.connector.writer.load.LoadModel; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,9 +67,11 @@ public class DorisDefaultSinkService implements DorisSinkService { private final MetricsJmxReporter metricsJmxReporter; private final DorisConnectMonitor connectMonitor; private final ObjectMapper objectMapper; + private final SinkTaskContext context; - DorisDefaultSinkService(Map<String, String> config) { + DorisDefaultSinkService(Map<String, String> config, SinkTaskContext context) { this.dorisOptions = new DorisOptions(config); + this.context = context; this.objectMapper = new ObjectMapper(); this.writer = new HashMap<>(); this.conn = new JdbcConnectionProvider(dorisOptions); @@ -131,6 +135,8 @@ public class DorisDefaultSinkService implements DorisSinkService { record.kafkaOffset()); continue; } + // check topic mutating SMTs + checkTopicMutating(record); // Might happen a count of record based flushing,buffer insert(record); } @@ -194,6 +200,18 @@ public class DorisDefaultSinkService implements DorisSinkService { }); } + /** Check if the topic of record is mutated */ + public void checkTopicMutating(SinkRecord record) { + TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition()); + if (!context.assignment().contains(tp)) { + throw new ConnectException( + "Unexpected topic name: [" + + record.topic() + + "] that doesn't match assigned partitions. " + + "Connector doesn't support topic mutating SMTs. "); + } + } + /** * Get the table name in doris for the given record. * diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java index 05ae10d..dbf7192 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java @@ -20,11 +20,13 @@ package org.apache.doris.kafka.connector.service; import java.util.Map; +import org.apache.kafka.connect.sink.SinkTaskContext; /** A factory to create {@link DorisSinkService} */ public class DorisSinkServiceFactory { - public static DorisSinkService getDorisSinkService(Map<String, String> connectorConfig) { - return new DorisDefaultSinkService(connectorConfig); + public static DorisSinkService getDorisSinkService( + Map<String, String> connectorConfig, SinkTaskContext context) { + return new DorisDefaultSinkService(connectorConfig, context); } } diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java index 2e9dc8d..786d536 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java @@ -37,5 +37,7 @@ public interface KafkaContainerService { void deleteKafkaConnector(String name); + String getConnectorTaskStatus(String name); + void close(); } diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java index ec39f49..cf1c164 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java @@ -19,6 +19,8 @@ package org.apache.doris.kafka.connector.e2e.kafka; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -35,10 +37,12 @@ import org.apache.doris.kafka.connector.exception.DorisException; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.apache.kafka.connect.cli.ConnectDistributed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +74,7 @@ public class KafkaContainerServiceImpl implements KafkaContainerService { private static final int MAX_RETRIES = 5; private GenericContainer schemaRegistryContainer; private static Network network = Network.SHARED; + private static final ObjectMapper objectMapper = new ObjectMapper(); @Override public String getInstanceHostAndPort() { @@ -284,4 +289,32 @@ public class KafkaContainerServiceImpl implements KafkaContainerService { } LOG.info("{} Kafka connector deleted successfully.", name); } + + @Override + public String getConnectorTaskStatus(String name) { + String connectUrl = "http://" + kafkaServerHost + ":" + CONNECT_PORT + "/connectors/"; + String getStatusUrl = connectUrl + name + "/status"; + HttpGet httpGet = new HttpGet(getStatusUrl); + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + StatusLine statusLine = response.getStatusLine(); + if (statusLine.getStatusCode() != 200) { + LOG.warn( + "Failed to get connector status, name={}, msg={}", + name, + statusLine.getReasonPhrase()); + } + JsonNode root = objectMapper.readTree(EntityUtils.toString(response.getEntity())); + JsonNode tasks = root.get("tasks"); + // tasks is an array, and only care about the first task + if (tasks != null && tasks.isArray() && tasks.size() > 0) { + JsonNode task = tasks.get(0); + return task.get("state").asText(); // RUNNING / FAILED / UNASSIGNED + } else { + LOG.warn("No task info found for connector: " + name); + } + } catch (IOException e) { + LOG.warn("Failed to get kafka connect task status, name={}", name); + } + return null; + } } diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java index 8c4f207..4430dad 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java @@ -306,6 +306,31 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { checkResult(expectedResult, query1, 3); } + @Test + public void testTopicMutatingSmt() throws Exception { + initialize("src/test/resources/e2e/transforms/regex_router_transforms.json"); + String topic = "p-regex_router_transform_msg"; + String msg1 = "{\"id\":1,\"col1\":\"col1\",\"col2\":\"col2\"}"; + produceMsg2Kafka(topic, msg1); + + String tableSql1 = + loadContent("src/test/resources/e2e/transforms/regex_router_transforms.sql"); + createTable(tableSql1); + + Thread.sleep(2000); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + List<String> expectedResult = Collections.emptyList(); + Thread.sleep(10000); + String query1 = + String.format( + "select id,col1,col2 from %s.%s order by id", + database, "regex_router_transform_msg"); + + Assert.assertEquals("FAILED", kafkaContainerService.getConnectorTaskStatus(connectorName)); + checkResult(expectedResult, query1, 3); + } + @AfterClass public static void closeInstance() { kafkaContainerService.deleteKafkaConnector(connectorName); diff --git a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java index 7ea1c2a..dc2e6ae 100644 --- a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java +++ b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java @@ -19,6 +19,10 @@ package org.apache.doris.kafka.connector.service; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Sets; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; @@ -26,12 +30,15 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTaskContext; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -53,7 +60,10 @@ public class TestDorisSinkService { props.put("task_id", "1"); props.put("name", "sink-connector-test"); props.put("record.tablename.field", "table_name"); - dorisDefaultSinkService = new DorisDefaultSinkService((Map) props); + SinkTaskContext context = mock(SinkTaskContext.class); + TopicPartition assignedTp = new TopicPartition("expected-topic", 0); + when(context.assignment()).thenReturn(Sets.newHashSet(assignedTp)); + dorisDefaultSinkService = new DorisDefaultSinkService((Map) props, context); jsonConverter.configure(new HashMap<>(), false); } @@ -132,4 +142,32 @@ public class TestDorisSinkService { Assert.assertEquals( "test_kafka_tbl", dorisDefaultSinkService.getSinkDorisTableName(record5)); } + + @Test + public void testCheckTopicMutating() { + SinkRecord record1 = + new SinkRecord( + "expected-topic", + 0, + Schema.OPTIONAL_STRING_SCHEMA, + "key", + Schema.OPTIONAL_STRING_SCHEMA, + "val", + 1); + SinkRecord record2 = + new SinkRecord( + "mutated-topic", + 0, + Schema.OPTIONAL_STRING_SCHEMA, + "key", + Schema.OPTIONAL_STRING_SCHEMA, + "val", + 1); + dorisDefaultSinkService.checkTopicMutating(record1); + + Assert.assertThrows( + "Unexpected topic name: [mutated_topic] that doesn't match assigned partitions. Connector doesn't support topic mutating SMTs.", + ConnectException.class, + () -> dorisDefaultSinkService.checkTopicMutating(record2)); + } } diff --git a/src/test/resources/e2e/transforms/regex_router_transforms.json b/src/test/resources/e2e/transforms/regex_router_transforms.json new file mode 100644 index 0000000..d8aff6d --- /dev/null +++ b/src/test/resources/e2e/transforms/regex_router_transforms.json @@ -0,0 +1,26 @@ +{ + "name":"regex_router_transforms_connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"p-regex_router_transform_msg", + "tasks.max":"1", + "doris.topic2table.map": "p-regex_router_transform_msg:regex_router_transform_msg,regex_router_transform_msg:regex_router_transform_msg", + "buffer.count.records":"2", + "buffer.flush.time":"11", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"transforms_msg", + "load.model":"stream_load", + "transforms": "dropPrefix", + "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.dropPrefix.regex": "p-(.*)", + "transforms.dropPrefix.replacement": "$1", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/transforms/regex_router_transforms.sql b/src/test/resources/e2e/transforms/regex_router_transforms.sql new file mode 100644 index 0000000..e72edcd --- /dev/null +++ b/src/test/resources/e2e/transforms/regex_router_transforms.sql @@ -0,0 +1,12 @@ +-- Please note that the database here should be consistent with doris.database in the file where the connector is registered. +CREATE TABLE transforms_msg.regex_router_transform_msg ( + id INT NULL, + col1 VARCHAR(20) NULL, + col2 varchar(20) NULL +) ENGINE=OLAP +UNIQUE KEY(`id`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org