chuang-wang-pre opened a new pull request, #68: URL: https://github.com/apache/doris-kafka-connector/pull/68
Currentlly, if using `io.confluent.connect.transforms.ExtractTopic` or `org.apache.kafka.connect.transforms.RegexRouter` or other Transformers have changed the topic of the record, according to the current implementation, **`doris-kafka-connector` will not successfully submit the consumer offset.** because in `org.apache.kafka.connect.sink.SinkTask#preCommit`, the topic corresponds to the original topic before applying any transformations. If the connector is restarted, it will be consumed repeatedly. More seriously, by default, **`two_phase_commit` is enabled, data will not be written to doris** because transactions were not successfully committed in `org.apache.kafka.connect.sink.SinkTask#preCommit`. But at this point, both the connector and task are in a normal RUNNING state, and the user is not aware of this abnormal situation. Although after version 3.6, Kafka supports originalTopic and can maintain the mapping between the original topic and offset, lower versions of Kafka do not support it. So personally, I think it would be better to throw an exception directly in this situation For example, for the following configured connector: > { > "name":"test_connector", > "config":{ > "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", > "topics":"original_topic", > "tasks.max":"1", > "doris.topic2table.map": "original_topic:test_table,transformed_original_topic:test_table", > "buffer.count.records":"2", > "buffer.flush.time":"11", > "buffer.size.bytes":"10000000", > "doris.urls":"127.0.0.1", > "doris.user":"root", > "doris.password":"", > "doris.http.port":"8030", > "doris.query.port":"9030", > "doris.database":"transforms_msg", > "load.model":"stream_load", > "transforms": "renameTopic", > "transforms": "AddPrefix", > "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", > "transforms.AddPrefix.regex": ".*", > "transforms.AddPrefix.replacement": "transformed_$0" > } > } errors will occur when consuming dataļ¼ `Unexpected topic: [transformed_original_topic] in SinkRecord. This may be caused by a Single Message Transform (SMT) modifying the topic. Please check your connector configuration.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org