chuang-wang-pre commented on code in PR #55: URL: https://github.com/apache/doris-kafka-connector/pull/55#discussion_r1895707494
########## src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java: ########## @@ -161,12 +177,66 @@ public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.keySet() .forEach( tp -> { - String name = getNameIndex(tp.topic(), tp.partition()); - writer.get(name).commit(tp.partition()); + String tpName = getNameIndex(tp.topic(), tp.partition()); + // commit all writers that match the topic and partition + for (Map.Entry<String, DorisWriter> entry : writer.entrySet()) { + if (entry.getKey().startsWith(tpName)) { + entry.getValue().commit(tp.partition()); + } + } }); } + /** + * Get the table name in doris for the given topic. If the table name is not found in the + * config, use the topic name as the table name. + * + * @param topic topic name + * @return table name in doris + */ + private String getTopicMapTableInConfig(String topic) { Review Comment: Yes, I deleted this code. But the configuration item` 'doris.topic2table.map' `can be left unconfigured, this defaults to using `topic` as the `table`, and I have placed this logic in `DorisOptions` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org