DongLiang-0 commented on code in PR #55: URL: https://github.com/apache/doris-kafka-connector/pull/55#discussion_r1895527089
########## src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java: ########## @@ -161,12 +177,66 @@ public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.keySet() .forEach( tp -> { - String name = getNameIndex(tp.topic(), tp.partition()); - writer.get(name).commit(tp.partition()); + String tpName = getNameIndex(tp.topic(), tp.partition()); + // commit all writers that match the topic and partition + for (Map.Entry<String, DorisWriter> entry : writer.entrySet()) { + if (entry.getKey().startsWith(tpName)) { + entry.getValue().commit(tp.partition()); + } + } }); } + /** + * Get the table name in doris for the given topic. If the table name is not found in the + * config, use the topic name as the table name. + * + * @param topic topic name + * @return table name in doris + */ + private String getTopicMapTableInConfig(String topic) { + String table = dorisOptions.getTopicMapTable(topic); + if (StringUtils.isEmpty(table)) { + return topic; + } + return table; + } + + /** + * Get the table name in doris for the given record. + * + * @param record sink record + * @return table name in doris + */ + private String getSinkDorisTableName(SinkRecord record) { + String defaultTableName = getTopicMapTableInConfig(record.topic()); + String field = dorisOptions.getTableField(); + // if the field is not set, use the table name in the config + if (StringUtils.isEmpty(field)) { + return defaultTableName; + } + if (!(record.value() instanceof Map)) { + LOG.warn("Only Map objects supported for The 'doris.table.field' configuration"); Review Comment: LOG.warn("Only Map objects supported for The 'doris.table.field' configuration, field={}, record type={}", field, record.value().getClass().getName()); ########## src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java: ########## @@ -161,12 +177,66 @@ public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.keySet() .forEach( tp -> { - String name = getNameIndex(tp.topic(), tp.partition()); - writer.get(name).commit(tp.partition()); + String tpName = getNameIndex(tp.topic(), tp.partition()); + // commit all writers that match the topic and partition + for (Map.Entry<String, DorisWriter> entry : writer.entrySet()) { + if (entry.getKey().startsWith(tpName)) { + entry.getValue().commit(tp.partition()); + } + } }); } + /** + * Get the table name in doris for the given topic. If the table name is not found in the + * config, use the topic name as the table name. + * + * @param topic topic name + * @return table name in doris + */ + private String getTopicMapTableInConfig(String topic) { + String table = dorisOptions.getTopicMapTable(topic); + if (StringUtils.isEmpty(table)) { + return topic; + } + return table; + } + + /** + * Get the table name in doris for the given record. + * + * @param record sink record + * @return table name in doris + */ + private String getSinkDorisTableName(SinkRecord record) { + String defaultTableName = getTopicMapTableInConfig(record.topic()); + String field = dorisOptions.getTableField(); + // if the field is not set, use the table name in the config + if (StringUtils.isEmpty(field)) { + return defaultTableName; + } + if (!(record.value() instanceof Map)) { + LOG.warn("Only Map objects supported for The 'doris.table.field' configuration"); + return defaultTableName; + } + Map<String, Object> map = (Map<String, Object>) record.value(); + // if the field is not found in the record, use the table name in the config + return map.getOrDefault(field, defaultTableName).toString(); + } + private static String getNameIndex(String topic, int partition) { return topic + "_" + partition; } + + /** + * Parse the writer unique key + * + * @param topic topic name + * @param partition partition number + * @param tableName table name + * @return writer key + */ + private static String getWriterKey(String topic, int partition, String tableName) { + return topic + "_" + partition + "_" + tableName; Review Comment: If the tableName is the same as the `dorisOptions.getTopicMapTable(topic)`, is it possible not to splice the tableName? ########## src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java: ########## @@ -161,12 +177,66 @@ public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.keySet() .forEach( tp -> { - String name = getNameIndex(tp.topic(), tp.partition()); - writer.get(name).commit(tp.partition()); + String tpName = getNameIndex(tp.topic(), tp.partition()); + // commit all writers that match the topic and partition + for (Map.Entry<String, DorisWriter> entry : writer.entrySet()) { + if (entry.getKey().startsWith(tpName)) { + entry.getValue().commit(tp.partition()); + } + } }); } + /** + * Get the table name in doris for the given topic. If the table name is not found in the + * config, use the topic name as the table name. + * + * @param topic topic name + * @return table name in doris + */ + private String getTopicMapTableInConfig(String topic) { Review Comment: tableName cannot be configured as empty, so can this code be deleted and use `dorisOptions.getTopicMapTable()` directly? ########## src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java: ########## @@ -161,12 +177,66 @@ public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.keySet() .forEach( tp -> { - String name = getNameIndex(tp.topic(), tp.partition()); - writer.get(name).commit(tp.partition()); + String tpName = getNameIndex(tp.topic(), tp.partition()); + // commit all writers that match the topic and partition + for (Map.Entry<String, DorisWriter> entry : writer.entrySet()) { + if (entry.getKey().startsWith(tpName)) { + entry.getValue().commit(tp.partition()); + } + } }); } + /** + * Get the table name in doris for the given topic. If the table name is not found in the + * config, use the topic name as the table name. + * + * @param topic topic name + * @return table name in doris + */ + private String getTopicMapTableInConfig(String topic) { + String table = dorisOptions.getTopicMapTable(topic); + if (StringUtils.isEmpty(table)) { + return topic; + } + return table; + } + + /** + * Get the table name in doris for the given record. + * + * @param record sink record + * @return table name in doris + */ + private String getSinkDorisTableName(SinkRecord record) { Review Comment: Is it possible to add a test case? ########## src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java: ########## @@ -161,12 +177,66 @@ public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.keySet() .forEach( tp -> { - String name = getNameIndex(tp.topic(), tp.partition()); - writer.get(name).commit(tp.partition()); + String tpName = getNameIndex(tp.topic(), tp.partition()); + // commit all writers that match the topic and partition + for (Map.Entry<String, DorisWriter> entry : writer.entrySet()) { + if (entry.getKey().startsWith(tpName)) { + entry.getValue().commit(tp.partition()); + } + } }); } + /** + * Get the table name in doris for the given topic. If the table name is not found in the + * config, use the topic name as the table name. + * + * @param topic topic name + * @return table name in doris + */ + private String getTopicMapTableInConfig(String topic) { + String table = dorisOptions.getTopicMapTable(topic); + if (StringUtils.isEmpty(table)) { + return topic; + } + return table; + } + + /** + * Get the table name in doris for the given record. + * + * @param record sink record + * @return table name in doris + */ + private String getSinkDorisTableName(SinkRecord record) { + String defaultTableName = getTopicMapTableInConfig(record.topic()); + String field = dorisOptions.getTableField(); + // if the field is not set, use the table name in the config + if (StringUtils.isEmpty(field)) { + return defaultTableName; + } + if (!(record.value() instanceof Map)) { + LOG.warn("Only Map objects supported for The 'doris.table.field' configuration"); + return defaultTableName; + } + Map<String, Object> map = (Map<String, Object>) record.value(); + // if the field is not found in the record, use the table name in the config + return map.getOrDefault(field, defaultTableName).toString(); Review Comment: If the `record` does not contain the `tableName` field, will it cause a tableName NullPointerException? the `defaultTableName` should be returned by default, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org