chuang-wang-pre opened a new pull request, #68:
URL: https://github.com/apache/doris-kafka-connector/pull/68

   Currentlly, if using `io.confluent.connect.transforms.ExtractTopic` or 
`org.apache.kafka.connect.transforms.RegexRouter` or other Transformers have 
changed the topic of the record, according to the current implementation, 
**`doris-kafka-connector` will not successfully submit the consumer offset.** 
because in `org.apache.kafka.connect.sink.SinkTask#preCommit`, the topic 
corresponds to the original topic before applying any transformations. If the 
connector is restarted, it will be consumed repeatedly. 
   
   More seriously, by default, **`two_phase_commit` is enabled, data will not 
be written to doris** because transactions were not successfully committed in 
`org.apache.kafka.connect.sink.SinkTask#preCommit`. But at this point, both the 
connector and task are in a normal RUNNING state, and the user is not aware of 
this abnormal situation. 
   
   Although after version 3.6, Kafka supports originalTopic and can maintain 
the mapping between the original topic and offset, lower versions of Kafka do 
not support it. So personally, I think it would be better to throw an exception 
directly in this situation
   
   For example, for the following configured connector:
   
   > {
   >   "name":"test_connector",
   >   "config":{
   >     
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
   >     "topics":"original_topic",
   >     "tasks.max":"1",
   >     "doris.topic2table.map": 
"original_topic:test_table,transformed_original_topic:test_table",
   >     "buffer.count.records":"2",
   >     "buffer.flush.time":"11",
   >     "buffer.size.bytes":"10000000",
   >     "doris.urls":"127.0.0.1",
   >     "doris.user":"root",
   >     "doris.password":"",
   >     "doris.http.port":"8030",
   >     "doris.query.port":"9030",
   >     "doris.database":"transforms_msg",
   >     "load.model":"stream_load",
   >     "transforms": "renameTopic",
   >     "transforms": "AddPrefix",
   >    "transforms.AddPrefix.type": 
"org.apache.kafka.connect.transforms.RegexRouter",
   >    "transforms.AddPrefix.regex": ".*",
   >    "transforms.AddPrefix.replacement": "transformed_$0"
   >   }
   > }
   
   errors will occur when consuming data:
   `Unexpected topic: [transformed_original_topic] in SinkRecord. This may be 
caused by a Single Message Transform (SMT) modifying the topic. Please check 
your connector configuration.`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to